2025-03-20 09:18:19 -05:00
from os import environ as env
from sys import argv
2025-02-28 11:34:28 -06:00
from slack_bolt import App
2025-03-20 09:08:53 -05:00
from slack_sdk . errors import SlackApiError
2025-02-28 11:34:28 -06:00
from dotenv import load_dotenv
import firepup650 as fp
from traceback import format_exc
2025-03-20 09:08:53 -05:00
from time import sleep
from base64 import b64encode
2025-03-20 09:57:36 -05:00
from typing import NoReturn
2025-02-28 11:34:28 -06:00
input = fp . replitInput
fp . replitCursor = (
fp . bcolors . REPLIT + " >>> " + fp . bcolors . RESET
) # Totally not hijacking one of my functions to use ;P
load_dotenv ( )
2025-03-20 09:08:53 -05:00
for requiredVar in [ " SLACK_BOT_TOKEN " ] :
2025-03-20 09:18:19 -05:00
if not env . get ( requiredVar ) :
2025-02-28 11:34:28 -06:00
raise ValueError (
f ' Missing required environment variable " { requiredVar } " . Please create a .env file in the same directory as this script and define the missing variable. '
)
print ( " [INFO] Establishing a connection to slack... " )
2025-03-20 09:18:19 -05:00
app = App ( token = env . get ( " SLACK_BOT_TOKEN " ) )
2025-02-28 11:34:28 -06:00
client = app . client
2025-02-28 12:10:56 -06:00
2025-03-20 09:08:53 -05:00
def encode ( string : str ) - > str :
return b64encode ( string . encode ( " utf-8 " ) ) . decode ( " utf-8 " )
def __writeCache ( userCache , botCache , cursorCache ) :
with open (
" cache.py " , " w "
) as cacheFile : # It is many times faster to load from a local file instead of from slack
cacheFile . writelines (
[
f " userMappings = { userCache } \n " ,
f " botMappings = { botCache } \n " ,
f ' cursorCache = " { cursorCache } " \n ' ,
]
)
print ( " [INFO] Cache saved. " )
def __generateCache ( userCache , botCache , cursor ) :
users_list = [ ]
pages = 0
while (
cursor
) : # If slack gives us a cursor, then we ain't done loading user data yet
data = None
while not data : # Ratelimit logic
try :
if cursor != " N/A " :
data = client . users_list ( cursor = cursor , limit = 1000 )
else :
data = client . users_list ( limit = 1000 )
except SlackApiError as e :
retry = e . response . headers [ " retry-after " ]
print (
f " [WARN] Ratelimit hit! Sleeping for { retry } seconds as the retry-after header has specified "
)
sleep ( int ( retry ) )
print ( " [WARN] Resuming.. " )
cursor = data [ " response_metadata " ] [ " next_cursor " ]
users_list . extend ( data [ " members " ] )
pages + = 1
print (
f " [INFO] Pages of users loaded: { pages } ( { ' User count is less than ' if not cursor else ' Estimated user count so far: ' } { pages } 000) "
)
if len ( users_list ) == 0 :
exit (
f " [EXIT] Slack returned exactly zero users when given a cursor, which means my cursor is corrupt. Please delete cache.py and re-run the script. "
)
cursorCache = encode ( f " user: { users_list [ - 1 ] [ ' id ' ] } " )
if len ( users_list ) == 1 :
print ( " [INFO] No new users to load. " )
return userCache , botCache , cursorCache
del pages
print ( " [INFO] Building user and bot mappings now, this shouldn ' t take long... " )
for (
user
) in (
users_list
) : # Map user ID mentions to user ID + name mentions, it's nicer when printing messages.
userCache [ f " <@ { user [ ' id ' ] } > " ] = (
f " <@ { user [ ' id ' ] } | { user [ ' profile ' ] [ ' display_name_normalized ' ] } > "
if user [ " profile " ] . get ( " display_name_normalized " )
else ( # User is missing a display name for some reason, fallback to real names
f " <@ { user [ ' id ' ] } | { user [ ' profile ' ] [ ' real_name_normalized ' ] } > "
if user [ " profile " ] . get ( " real_name_normalized " )
else f " <@ { user [ ' id ' ] } | { user [ ' name ' ] } > " # User is missing a real name too... Fallback to raw name
)
)
if user [ " is_bot " ] :
botCache [ user [ " profile " ] [ " bot_id " ] ] = user [ " id " ]
return userCache , botCache , cursorCache
def __innerMessageParser ( message : dict ) - > dict :
try :
if not message . get ( " user " ) and message . get ( " bot_id " ) : # Apps sometimes don't...
bot_id = message [ " bot_id " ]
if botMappings . get ( bot_id ) :
message [ " user " ] = botMappings [ bot_id ]
else :
print (
""" [WARN] Unknown bot {bot_id} !
[ WARN ] Cache may be out of date ! """
)
message [ " user " ] = f " { bot_id } |UNKNOWN BOT "
except Exception :
print ( " [WARN] Exception " )
for line in format_exc ( ) . split ( " \n " ) [ : - 1 ] :
print ( f " [WARN] { line } " )
print ( f " [HELP] Raw message that caused this error: { message } " )
message [ " user " ] = " AN EXCEPTION OCCURED|UNKOWN USER "
if not message . get ( " user " ) :
print ( message )
message [ " user " ] = " FALLBACK|UNKNOWN USER "
return message
2025-02-28 12:10:56 -06:00
def buildThreadedMessages ( messages : dict ) - > dict :
print ( " [INFO] Building messages, this might take a little bit... " )
texts = { }
for i in range ( len ( messages ) ) :
2025-03-20 09:08:53 -05:00
message = __innerMessageParser ( messages [ i ] )
label = f ' [ { message [ " ts " ] } ] <@ { message [ " user " ] } >: { message [ " text " ] } '
2025-02-28 12:10:56 -06:00
for user in userMappings :
label = label . replace ( user , userMappings [ user ] )
texts [ label ] = i
return texts
2025-02-28 12:26:20 -06:00
def buildMessages ( messages : dict ) - > str :
2025-02-28 12:10:56 -06:00
print ( " [INFO] Building messages, this might take a little bit... " )
2025-02-28 12:26:20 -06:00
for i in range ( len ( messages ) - 1 , - 1 , - 1 ) :
2025-03-20 09:08:53 -05:00
message = __innerMessageParser ( messages [ i ] )
msg = f ' [MSGS] [ { message [ " ts " ] } ] <@ { message [ " user " ] } >: { message [ " text " ] } '
2025-02-28 12:10:56 -06:00
for user in userMappings :
msg = msg . replace ( user , userMappings [ user ] )
print ( msg )
2025-02-28 12:26:20 -06:00
return messages [ 0 ] [ " ts " ]
2025-02-28 12:10:56 -06:00
2025-03-20 09:57:36 -05:00
def messaging ( ) - > NoReturn :
2025-02-28 11:34:28 -06:00
while 1 :
chan = input ( " Channel ID " )
try :
2025-03-07 08:58:32 -06:00
oldest_ts = " "
2025-02-28 12:10:56 -06:00
try :
print (
" [INFO] Trying to load the last 50 messages sent in this channel... "
)
res = client . conversations_history (
channel = chan , inclusive = True , limit = 50
)
buildMessages ( res [ " messages " ] )
2025-03-07 08:58:32 -06:00
oldest_ts = res [ " messages " ] [ 0 ] [ " ts " ]
2025-02-28 12:10:56 -06:00
del res
except Exception as E :
print ( " [WARN] Exception " )
for line in format_exc ( ) . split ( " \n " ) [ : - 1 ] :
print ( f " [WARN] { line } " )
print (
" [HELP] The bot probably isn ' t in this channel. If it ' s public you can likely send anyways, but this will fail otherwise. "
)
2025-02-28 11:34:28 -06:00
print ( " [INFO] ^C to change channel " )
while 1 :
thread = input ( " Reply to a thread? (y|N) " ) . lower ( ) . startswith ( " y " )
ts = None
if thread :
hasID = (
input ( " Do you have the TS ID? (y|N) " ) . lower ( ) . startswith ( " y " )
)
if not hasID :
try :
print (
" [INFO] Getting the last 50 messages for threading options... "
)
res = client . conversations_history (
channel = chan , inclusive = True , limit = 50
)
messages = res [ " messages " ]
2025-02-28 12:10:56 -06:00
texts = buildThreadedMessages ( messages )
2025-02-28 11:34:28 -06:00
found = messages [
fp . menu (
texts ,
" Please select the message to reply to as a thread " ,
)
]
ts = found [ " ts " ]
except Exception as E :
print ( " [WARN] Exception: " )
for line in format_exc ( ) . split ( " \n " ) [ : - 1 ] :
print ( f " [WARN] { line } " )
2025-02-28 12:10:56 -06:00
print (
" [HELP] Does the bot have access to the channel you ' re trying to see? "
)
2025-02-28 11:34:28 -06:00
break
else :
ts = input ( " TS ID " )
print (
" [INFO] ^C to change/exit thread (^C twice if you want to change channel) "
)
try :
while 1 :
msg = input (
2025-03-07 08:58:32 -06:00
" [THRD] Message (Raw text, not blocks) "
2025-02-28 11:34:28 -06:00
) . replace ( " \\ n " , " \n " )
try :
client . chat_postMessage (
channel = chan , text = msg , thread_ts = ts
)
print ( " [INFO] Message sent (to the thread)! " )
except Exception as E :
print ( " [WARN] Exception: " )
for line in format_exc ( ) . split ( " \n " ) [ : - 1 ] :
print ( f " [WARN] { line } " )
break
except KeyboardInterrupt :
print ( )
if ts :
2025-03-07 08:58:32 -06:00
try :
print ( f " [INFO] Trying to load messages since { oldest_ts } ... " )
res = client . conversations_history (
channel = chan , inclusive = True , limit = 200 , oldest = oldest_ts
)
if len ( res [ " messages " ] ) > 1 :
buildMessages ( res [ " messages " ] [ : - 1 ] )
oldest_ts = res [ " messages " ] [ 0 ] [ " ts " ]
else :
print ( " [INFO] No new messages " )
del res
except Exception as E :
print ( " [WARN] Exception " )
for line in format_exc ( ) . split ( " \n " ) [ : - 1 ] :
print ( f " [WARN] { line } " )
print (
" [HELP] Does the bot have access to the channel you ' re trying to see? "
)
2025-02-28 11:34:28 -06:00
continue
2025-03-07 08:58:32 -06:00
msg = input ( " [CHAN] Message (Raw text, not blocks) " ) . replace (
2025-02-28 11:34:28 -06:00
" \\ n " , " \n "
)
try :
2025-03-07 08:58:32 -06:00
if msg != " " :
ts = client . chat_postMessage ( channel = chan , text = msg ) [ " ts " ]
print ( f " [INFO] Message sent (to the channel)! (TS ID: { ts } ) " )
try :
print ( f " [INFO] Trying to load messages since { oldest_ts } ... " )
res = client . conversations_history (
channel = chan , inclusive = True , limit = 200 , oldest = oldest_ts
)
if len ( res [ " messages " ] ) > 1 :
buildMessages ( res [ " messages " ] [ : - 1 ] )
oldest_ts = res [ " messages " ] [ 0 ] [ " ts " ]
else :
print ( " [INFO] No new messages " )
del res
except Exception as E :
print ( " [WARN] Exception " )
for line in format_exc ( ) . split ( " \n " ) [ : - 1 ] :
print ( f " [WARN] { line } " )
print (
" [HELP] Does the bot have access to the channel you ' re trying to see? "
)
2025-02-28 11:34:28 -06:00
except Exception as E :
print ( " [WARN] Exception: " )
for line in format_exc ( ) . split ( " \n " ) [ : - 1 ] :
print ( f " [WARN] { line } " )
break
except KeyboardInterrupt :
print ( )
2025-03-20 09:57:36 -05:00
# The below code will never run, but linters are dumb and need to be assured there is no possible return from a `NoReturn` function.
exit ( 1 )
userMappings = { }
botMappings = { }
cursor = " N/A "
try :
if " --no-cache " in argv :
print ( " [INFO] Skipping cache on user request " )
raise ImportError ( " User requested to skip cache " )
print ( " [INFO] Trying to load user and app mappings from cache... " )
from cache import userMappings , cursorCache , botMappings
print (
""" [INFO] Cache load OK.
[ INFO ] Reminder : If you need to regenerate the cache , call the script with ` - - no - cache ` """
)
print ( " [INFO] Checking for slack users newer than my cache... " )
userMappings , botMappings , cursor = __generateCache (
userMappings , botMappings , cursorCache
)
if cursor != cursorCache :
print ( " [INFO] New user and app mappings generated, writing cache file now... " )
__writeCache ( userMappings , botMappings , cursor )
except ImportError :
print ( " [WARN] Cache load failed, falling back to full load from slack... " )
userMappings , botMappings , cursor = __generateCache ( { } , { } , " N/A " )
print ( " [INFO] All user and app mappings generated, writing cache file now... " )
__writeCache ( userMappings , botMappings , cursor )
print ( " [INFO] User mappings loaded. User count: " , len ( userMappings ) )
print ( " [INFO] Bot mappings loaded. Bot count: " , len ( botMappings ) )
if __name__ == " __main__ " :
print ( " [INFO] ^D at any time to terminate program " )
messaging ( )