slack-app-cli/main-v2.py

316 lines
13 KiB
Python

from os import environ as env
from sys import argv
from slack_bolt import App
from slack_sdk.errors import SlackApiError
from dotenv import load_dotenv
import firepup650 as fp
from traceback import format_exc
from time import sleep
from base64 import b64encode
from typing import NoReturn
input = fp.replitInput
fp.replitCursor = (
fp.bcolors.REPLIT + ">>>" + fp.bcolors.RESET
) # Totally not hijacking one of my functions to use ;P
load_dotenv()
for requiredVar in ["SLACK_BOT_TOKEN"]:
if not env.get(requiredVar):
raise ValueError(
f'Missing required environment variable "{requiredVar}". Please create a .env file in the same directory as this script and define the missing variable.'
)
print("[INFO] Establishing a connection to slack...")
app = App(token=env.get("SLACK_BOT_TOKEN"))
client = app.client
def encode(string: str) -> str:
return b64encode(string.encode("utf-8")).decode("utf-8")
def __writeCache(userCache, botCache, cursorCache):
with open(
"cache.py", "w"
) as cacheFile: # It is many times faster to load from a local file instead of from slack
cacheFile.writelines(
[
f"userMappings = {userCache}\n",
f"botMappings = {botCache}\n",
f'cursorCache = "{cursorCache}"\n',
]
)
print("[INFO] Cache saved.")
def __generateCache(userCache, botCache, cursor):
users_list = []
pages = 0
while (
cursor
): # If slack gives us a cursor, then we ain't done loading user data yet
data = None
while not data: # Ratelimit logic
try:
if cursor != "N/A":
data = client.users_list(cursor=cursor, limit=1000)
else:
data = client.users_list(limit=1000)
except SlackApiError as e:
retry = e.response.headers["retry-after"]
print(
f"[WARN] Ratelimit hit! Sleeping for {retry} seconds as the retry-after header has specified"
)
sleep(int(retry))
print("[WARN] Resuming..")
cursor = data["response_metadata"]["next_cursor"]
users_list.extend(data["members"])
pages += 1
print(
f"[INFO] Pages of users loaded: {pages} ({'User count is less than' if not cursor else 'Estimated user count so far:'} {pages}000)"
)
if len(users_list) == 0:
exit(
f"[EXIT] Slack returned exactly zero users when given a cursor, which means my cursor is corrupt. Please delete cache.py and re-run the script."
)
cursorCache = encode(f"user:{users_list[-1]['id']}")
if len(users_list) == 1:
print("[INFO] No new users to load.")
return userCache, botCache, cursorCache
del pages
print("[INFO] Building user and bot mappings now, this shouldn't take long...")
for (
user
) in (
users_list
): # Map user ID mentions to user ID + name mentions, it's nicer when printing messages.
userCache[f"<@{user['id']}>"] = (
f"<@{user['id']}|{user['profile']['display_name_normalized']}>"
if user["profile"].get("display_name_normalized")
else ( # User is missing a display name for some reason, fallback to real names
f"<@{user['id']}|{user['profile']['real_name_normalized']}>"
if user["profile"].get("real_name_normalized")
else f"<@{user['id']}|{user['name']}>" # User is missing a real name too... Fallback to raw name
)
)
if user["is_bot"]:
botCache[user["profile"]["bot_id"]] = user["id"]
return userCache, botCache, cursorCache
def __innerMessageParser(message: dict) -> dict:
try:
if not message.get("user") and message.get("bot_id"): # Apps sometimes don't...
bot_id = message["bot_id"]
if botMappings.get(bot_id):
message["user"] = botMappings[bot_id]
else:
print(
"""[WARN] Unknown bot {bot_id}!
[WARN] Cache may be out of date!"""
)
message["user"] = f"{bot_id}|UNKNOWN BOT"
except Exception:
print("[WARN] Exception")
for line in format_exc().split("\n")[:-1]:
print(f"[WARN] {line}")
print(f"[HELP] Raw message that caused this error: {message}")
message["user"] = "AN EXCEPTION OCCURED|UNKOWN USER"
if not message.get("user"):
print(message)
message["user"] = "FALLBACK|UNKNOWN USER"
return message
def buildThreadedMessages(messages: dict) -> dict:
print("[INFO] Building messages, this might take a little bit...")
texts = {}
for i in range(len(messages)):
message = __innerMessageParser(messages[i])
label = f'[{message["ts"]}] <@{message["user"]}>: {message["text"]}'
for user in userMappings:
label = label.replace(user, userMappings[user])
texts[label] = i
return texts
def buildMessages(messages: dict) -> str:
print("[INFO] Building messages, this might take a little bit...")
for i in range(len(messages) - 1, -1, -1):
message = __innerMessageParser(messages[i])
msg = f'[MSGS] [{message["ts"]}] <@{message["user"]}>: {message["text"]}'
for user in userMappings:
msg = msg.replace(user, userMappings[user])
print(msg)
return messages[0]["ts"]
def messaging() -> NoReturn:
while 1:
chan = input("Channel ID")
try:
oldest_ts = ""
try:
print(
"[INFO] Trying to load the last 50 messages sent in this channel..."
)
res = client.conversations_history(
channel=chan, inclusive=True, limit=50
)
buildMessages(res["messages"])
oldest_ts = res["messages"][0]["ts"]
del res
except Exception as E:
print("[WARN] Exception")
for line in format_exc().split("\n")[:-1]:
print(f"[WARN] {line}")
print(
"[HELP] The bot probably isn't in this channel. If it's public you can likely send anyways, but this will fail otherwise."
)
print("[INFO] ^C to change channel")
while 1:
thread = input("Reply to a thread? (y|N)").lower().startswith("y")
ts = None
if thread:
hasID = (
input("Do you have the TS ID? (y|N)").lower().startswith("y")
)
if not hasID:
try:
print(
"[INFO] Getting the last 50 messages for threading options..."
)
res = client.conversations_history(
channel=chan, inclusive=True, limit=50
)
messages = res["messages"]
texts = buildThreadedMessages(messages)
found = messages[
fp.menu(
texts,
"Please select the message to reply to as a thread",
)
]
ts = found["ts"]
except Exception as E:
print("[WARN] Exception:")
for line in format_exc().split("\n")[:-1]:
print(f"[WARN] {line}")
print(
"[HELP] Does the bot have access to the channel you're trying to see?"
)
break
else:
ts = input("TS ID")
print(
"[INFO] ^C to change/exit thread (^C twice if you want to change channel)"
)
try:
while 1:
msg = input(
"[THRD] Message (Raw text, not blocks)"
).replace("\\n", "\n")
try:
client.chat_postMessage(
channel=chan, text=msg, thread_ts=ts
)
print("[INFO] Message sent (to the thread)!")
except Exception as E:
print("[WARN] Exception:")
for line in format_exc().split("\n")[:-1]:
print(f"[WARN] {line}")
break
except KeyboardInterrupt:
print()
if ts:
try:
print(f"[INFO] Trying to load messages since {oldest_ts}...")
res = client.conversations_history(
channel=chan, inclusive=True, limit=200, oldest=oldest_ts
)
if len(res["messages"]) > 1:
buildMessages(res["messages"][:-1])
oldest_ts = res["messages"][0]["ts"]
else:
print("[INFO] No new messages")
del res
except Exception as E:
print("[WARN] Exception")
for line in format_exc().split("\n")[:-1]:
print(f"[WARN] {line}")
print(
"[HELP] Does the bot have access to the channel you're trying to see?"
)
continue
msg = input("[CHAN] Message (Raw text, not blocks)").replace(
"\\n", "\n"
)
try:
if msg != "":
ts = client.chat_postMessage(channel=chan, text=msg)["ts"]
print(f"[INFO] Message sent (to the channel)! (TS ID: {ts})")
try:
print(f"[INFO] Trying to load messages since {oldest_ts}...")
res = client.conversations_history(
channel=chan, inclusive=True, limit=200, oldest=oldest_ts
)
if len(res["messages"]) > 1:
buildMessages(res["messages"][:-1])
oldest_ts = res["messages"][0]["ts"]
else:
print("[INFO] No new messages")
del res
except Exception as E:
print("[WARN] Exception")
for line in format_exc().split("\n")[:-1]:
print(f"[WARN] {line}")
print(
"[HELP] Does the bot have access to the channel you're trying to see?"
)
except Exception as E:
print("[WARN] Exception:")
for line in format_exc().split("\n")[:-1]:
print(f"[WARN] {line}")
break
except KeyboardInterrupt:
print()
# The below code will never run, but linters are dumb and need to be assured there is no possible return from a `NoReturn` function.
exit(1)
userMappings = {}
botMappings = {}
cursor = "N/A"
try:
if "--no-cache" in argv:
print("[INFO] Skipping cache on user request")
raise ImportError("User requested to skip cache")
print("[INFO] Trying to load user and app mappings from cache...")
from cache import userMappings, cursorCache, botMappings
print(
"""[INFO] Cache load OK.
[INFO] Reminder: If you need to regenerate the cache, call the script with `--no-cache`"""
)
print("[INFO] Checking for slack users newer than my cache...")
userMappings, botMappings, cursor = __generateCache(
userMappings, botMappings, cursorCache
)
if cursor != cursorCache:
print("[INFO] New user and app mappings generated, writing cache file now...")
__writeCache(userMappings, botMappings, cursor)
except ImportError:
print("[WARN] Cache load failed, falling back to full load from slack...")
userMappings, botMappings, cursor = __generateCache({}, {}, "N/A")
print("[INFO] All user and app mappings generated, writing cache file now...")
__writeCache(userMappings, botMappings, cursor)
print("[INFO] User mappings loaded. User count:", len(userMappings))
print("[INFO] Bot mappings loaded. Bot count:", len(botMappings))
if __name__ == "__main__":
print("[INFO] ^D at any time to terminate program")
messaging()