316 lines
13 KiB
Python
316 lines
13 KiB
Python
from os import environ as env
|
|
from sys import argv
|
|
from slack_bolt import App
|
|
from slack_sdk.errors import SlackApiError
|
|
from dotenv import load_dotenv
|
|
import firepup650 as fp
|
|
from traceback import format_exc
|
|
from time import sleep
|
|
from base64 import b64encode
|
|
from typing import NoReturn
|
|
|
|
input = fp.replitInput
|
|
|
|
fp.replitCursor = (
|
|
fp.bcolors.REPLIT + ">>>" + fp.bcolors.RESET
|
|
) # Totally not hijacking one of my functions to use ;P
|
|
|
|
load_dotenv()
|
|
|
|
for requiredVar in ["SLACK_BOT_TOKEN"]:
|
|
if not env.get(requiredVar):
|
|
raise ValueError(
|
|
f'Missing required environment variable "{requiredVar}". Please create a .env file in the same directory as this script and define the missing variable.'
|
|
)
|
|
|
|
print("[INFO] Establishing a connection to slack...")
|
|
app = App(token=env.get("SLACK_BOT_TOKEN"))
|
|
client = app.client
|
|
|
|
|
|
def encode(string: str) -> str:
|
|
return b64encode(string.encode("utf-8")).decode("utf-8")
|
|
|
|
|
|
def __writeCache(userCache, botCache, cursorCache):
|
|
with open(
|
|
"cache.py", "w"
|
|
) as cacheFile: # It is many times faster to load from a local file instead of from slack
|
|
cacheFile.writelines(
|
|
[
|
|
f"userMappings = {userCache}\n",
|
|
f"botMappings = {botCache}\n",
|
|
f'cursorCache = "{cursorCache}"\n',
|
|
]
|
|
)
|
|
print("[INFO] Cache saved.")
|
|
|
|
|
|
def __generateCache(userCache, botCache, cursor):
|
|
users_list = []
|
|
pages = 0
|
|
while (
|
|
cursor
|
|
): # If slack gives us a cursor, then we ain't done loading user data yet
|
|
data = None
|
|
while not data: # Ratelimit logic
|
|
try:
|
|
if cursor != "N/A":
|
|
data = client.users_list(cursor=cursor, limit=1000)
|
|
else:
|
|
data = client.users_list(limit=1000)
|
|
except SlackApiError as e:
|
|
retry = e.response.headers["retry-after"]
|
|
print(
|
|
f"[WARN] Ratelimit hit! Sleeping for {retry} seconds as the retry-after header has specified"
|
|
)
|
|
sleep(int(retry))
|
|
print("[WARN] Resuming..")
|
|
cursor = data["response_metadata"]["next_cursor"]
|
|
users_list.extend(data["members"])
|
|
pages += 1
|
|
print(
|
|
f"[INFO] Pages of users loaded: {pages} ({'User count is less than' if not cursor else 'Estimated user count so far:'} {pages}000)"
|
|
)
|
|
if len(users_list) == 0:
|
|
exit(
|
|
f"[EXIT] Slack returned exactly zero users when given a cursor, which means my cursor is corrupt. Please delete cache.py and re-run the script."
|
|
)
|
|
cursorCache = encode(f"user:{users_list[-1]['id']}")
|
|
if len(users_list) == 1:
|
|
print("[INFO] No new users to load.")
|
|
return userCache, botCache, cursorCache
|
|
del pages
|
|
print("[INFO] Building user and bot mappings now, this shouldn't take long...")
|
|
for (
|
|
user
|
|
) in (
|
|
users_list
|
|
): # Map user ID mentions to user ID + name mentions, it's nicer when printing messages.
|
|
userCache[f"<@{user['id']}>"] = (
|
|
f"<@{user['id']}|{user['profile']['display_name_normalized']}>"
|
|
if user["profile"].get("display_name_normalized")
|
|
else ( # User is missing a display name for some reason, fallback to real names
|
|
f"<@{user['id']}|{user['profile']['real_name_normalized']}>"
|
|
if user["profile"].get("real_name_normalized")
|
|
else f"<@{user['id']}|{user['name']}>" # User is missing a real name too... Fallback to raw name
|
|
)
|
|
)
|
|
if user["is_bot"]:
|
|
botCache[user["profile"]["bot_id"]] = user["id"]
|
|
return userCache, botCache, cursorCache
|
|
|
|
|
|
def __innerMessageParser(message: dict) -> dict:
|
|
try:
|
|
if not message.get("user") and message.get("bot_id"): # Apps sometimes don't...
|
|
bot_id = message["bot_id"]
|
|
if botMappings.get(bot_id):
|
|
message["user"] = botMappings[bot_id]
|
|
else:
|
|
print(
|
|
"""[WARN] Unknown bot {bot_id}!
|
|
[WARN] Cache may be out of date!"""
|
|
)
|
|
message["user"] = f"{bot_id}|UNKNOWN BOT"
|
|
except Exception:
|
|
print("[WARN] Exception")
|
|
for line in format_exc().split("\n")[:-1]:
|
|
print(f"[WARN] {line}")
|
|
print(f"[HELP] Raw message that caused this error: {message}")
|
|
message["user"] = "AN EXCEPTION OCCURED|UNKOWN USER"
|
|
if not message.get("user"):
|
|
print(message)
|
|
message["user"] = "FALLBACK|UNKNOWN USER"
|
|
return message
|
|
|
|
|
|
def buildThreadedMessages(messages: dict) -> dict:
|
|
print("[INFO] Building messages, this might take a little bit...")
|
|
texts = {}
|
|
for i in range(len(messages)):
|
|
message = __innerMessageParser(messages[i])
|
|
label = f'[{message["ts"]}] <@{message["user"]}>: {message["text"]}'
|
|
for user in userMappings:
|
|
label = label.replace(user, userMappings[user])
|
|
texts[label] = i
|
|
return texts
|
|
|
|
|
|
def buildMessages(messages: dict) -> str:
|
|
print("[INFO] Building messages, this might take a little bit...")
|
|
for i in range(len(messages) - 1, -1, -1):
|
|
message = __innerMessageParser(messages[i])
|
|
msg = f'[MSGS] [{message["ts"]}] <@{message["user"]}>: {message["text"]}'
|
|
for user in userMappings:
|
|
msg = msg.replace(user, userMappings[user])
|
|
print(msg)
|
|
return messages[0]["ts"]
|
|
|
|
|
|
def messaging() -> NoReturn:
|
|
while 1:
|
|
chan = input("Channel ID")
|
|
try:
|
|
oldest_ts = ""
|
|
try:
|
|
print(
|
|
"[INFO] Trying to load the last 50 messages sent in this channel..."
|
|
)
|
|
res = client.conversations_history(
|
|
channel=chan, inclusive=True, limit=50
|
|
)
|
|
buildMessages(res["messages"])
|
|
oldest_ts = res["messages"][0]["ts"]
|
|
del res
|
|
except Exception as E:
|
|
print("[WARN] Exception")
|
|
for line in format_exc().split("\n")[:-1]:
|
|
print(f"[WARN] {line}")
|
|
print(
|
|
"[HELP] The bot probably isn't in this channel. If it's public you can likely send anyways, but this will fail otherwise."
|
|
)
|
|
print("[INFO] ^C to change channel")
|
|
while 1:
|
|
thread = input("Reply to a thread? (y|N)").lower().startswith("y")
|
|
ts = None
|
|
if thread:
|
|
hasID = (
|
|
input("Do you have the TS ID? (y|N)").lower().startswith("y")
|
|
)
|
|
if not hasID:
|
|
try:
|
|
print(
|
|
"[INFO] Getting the last 50 messages for threading options..."
|
|
)
|
|
res = client.conversations_history(
|
|
channel=chan, inclusive=True, limit=50
|
|
)
|
|
messages = res["messages"]
|
|
texts = buildThreadedMessages(messages)
|
|
found = messages[
|
|
fp.menu(
|
|
texts,
|
|
"Please select the message to reply to as a thread",
|
|
)
|
|
]
|
|
ts = found["ts"]
|
|
except Exception as E:
|
|
print("[WARN] Exception:")
|
|
for line in format_exc().split("\n")[:-1]:
|
|
print(f"[WARN] {line}")
|
|
print(
|
|
"[HELP] Does the bot have access to the channel you're trying to see?"
|
|
)
|
|
break
|
|
else:
|
|
ts = input("TS ID")
|
|
print(
|
|
"[INFO] ^C to change/exit thread (^C twice if you want to change channel)"
|
|
)
|
|
try:
|
|
while 1:
|
|
msg = input(
|
|
"[THRD] Message (Raw text, not blocks)"
|
|
).replace("\\n", "\n")
|
|
try:
|
|
client.chat_postMessage(
|
|
channel=chan, text=msg, thread_ts=ts
|
|
)
|
|
print("[INFO] Message sent (to the thread)!")
|
|
except Exception as E:
|
|
print("[WARN] Exception:")
|
|
for line in format_exc().split("\n")[:-1]:
|
|
print(f"[WARN] {line}")
|
|
break
|
|
except KeyboardInterrupt:
|
|
print()
|
|
if ts:
|
|
try:
|
|
print(f"[INFO] Trying to load messages since {oldest_ts}...")
|
|
res = client.conversations_history(
|
|
channel=chan, inclusive=True, limit=200, oldest=oldest_ts
|
|
)
|
|
if len(res["messages"]) > 1:
|
|
buildMessages(res["messages"][:-1])
|
|
oldest_ts = res["messages"][0]["ts"]
|
|
else:
|
|
print("[INFO] No new messages")
|
|
del res
|
|
except Exception as E:
|
|
print("[WARN] Exception")
|
|
for line in format_exc().split("\n")[:-1]:
|
|
print(f"[WARN] {line}")
|
|
print(
|
|
"[HELP] Does the bot have access to the channel you're trying to see?"
|
|
)
|
|
continue
|
|
msg = input("[CHAN] Message (Raw text, not blocks)").replace(
|
|
"\\n", "\n"
|
|
)
|
|
try:
|
|
if msg != "":
|
|
ts = client.chat_postMessage(channel=chan, text=msg)["ts"]
|
|
print(f"[INFO] Message sent (to the channel)! (TS ID: {ts})")
|
|
try:
|
|
print(f"[INFO] Trying to load messages since {oldest_ts}...")
|
|
res = client.conversations_history(
|
|
channel=chan, inclusive=True, limit=200, oldest=oldest_ts
|
|
)
|
|
if len(res["messages"]) > 1:
|
|
buildMessages(res["messages"][:-1])
|
|
oldest_ts = res["messages"][0]["ts"]
|
|
else:
|
|
print("[INFO] No new messages")
|
|
del res
|
|
except Exception as E:
|
|
print("[WARN] Exception")
|
|
for line in format_exc().split("\n")[:-1]:
|
|
print(f"[WARN] {line}")
|
|
print(
|
|
"[HELP] Does the bot have access to the channel you're trying to see?"
|
|
)
|
|
except Exception as E:
|
|
print("[WARN] Exception:")
|
|
for line in format_exc().split("\n")[:-1]:
|
|
print(f"[WARN] {line}")
|
|
break
|
|
except KeyboardInterrupt:
|
|
print()
|
|
# The below code will never run, but linters are dumb and need to be assured there is no possible return from a `NoReturn` function.
|
|
exit(1)
|
|
|
|
|
|
userMappings = {}
|
|
botMappings = {}
|
|
cursor = "N/A"
|
|
try:
|
|
if "--no-cache" in argv:
|
|
print("[INFO] Skipping cache on user request")
|
|
raise ImportError("User requested to skip cache")
|
|
print("[INFO] Trying to load user and app mappings from cache...")
|
|
from cache import userMappings, cursorCache, botMappings
|
|
|
|
print(
|
|
"""[INFO] Cache load OK.
|
|
[INFO] Reminder: If you need to regenerate the cache, call the script with `--no-cache`"""
|
|
)
|
|
print("[INFO] Checking for slack users newer than my cache...")
|
|
userMappings, botMappings, cursor = __generateCache(
|
|
userMappings, botMappings, cursorCache
|
|
)
|
|
if cursor != cursorCache:
|
|
print("[INFO] New user and app mappings generated, writing cache file now...")
|
|
__writeCache(userMappings, botMappings, cursor)
|
|
except ImportError:
|
|
print("[WARN] Cache load failed, falling back to full load from slack...")
|
|
userMappings, botMappings, cursor = __generateCache({}, {}, "N/A")
|
|
print("[INFO] All user and app mappings generated, writing cache file now...")
|
|
__writeCache(userMappings, botMappings, cursor)
|
|
|
|
print("[INFO] User mappings loaded. User count:", len(userMappings))
|
|
print("[INFO] Bot mappings loaded. Bot count:", len(botMappings))
|
|
|
|
if __name__ == "__main__":
|
|
print("[INFO] ^D at any time to terminate program")
|
|
messaging()
|