refactor permissions and allow hostmasks to be assigned to accounts

This commit is contained in:
jesopo 2019-11-21 15:49:08 +00:00
parent 9f8c5acf52
commit 2dfc55fb9f
8 changed files with 253 additions and 301 deletions

View file

@ -305,7 +305,7 @@ class Module(ModuleManager.BaseModule):
flags = channel.get_user_setting(user.get_id(), "flags", "")
if flags:
identified = not user.get_identified_account() == None
identified = not user._id_override == None
current_modes = channel.get_user_modes(user)
modes = []

View file

@ -218,12 +218,12 @@ class Module(ModuleManager.BaseModule):
# response to a WHO command for user information
@utils.hook("raw.received.352", default_event=True)
def handle_352(self, event):
core.handle_352(event)
core.handle_352(self.events, event)
# response to a WHOX command for user information, including account name
@utils.hook("raw.received.354", default_event=True)
def handle_354(self, event):
core.handle_354(event)
core.handle_354(self.events, event)
# response to an empty mode command
@utils.hook("raw.received.324")

View file

@ -72,8 +72,7 @@ def join(events, event):
hostname=event["line"].source.hostname)
if account:
user.identified_account = account
user.identified_account_id = event["server"].get_user(account).get_id()
user.account = account
if realname:
user.realname = realname

View file

@ -105,7 +105,7 @@ def invite(events, event):
events.on("received.invite").call(user=user, target_channel=target_channel,
server=event["server"], target_user=target_user)
def handle_352(event):
def handle_352(events, event):
nickname = event["line"].args[5]
username = event["line"].args[2]
hostname = event["line"].args[3]
@ -117,8 +117,10 @@ def handle_352(event):
target = event["server"].get_user(nickname)
target.username = username
target.hostname = hostname
events.on("received.who").call(server=event["server"],
user=target)
def handle_354(event):
def handle_354(events, event):
if event["line"].args[1] == "111":
nickname = event["line"].args[4]
username = event["line"].args[2]
@ -136,11 +138,11 @@ def handle_354(event):
target.hostname = hostname
target.realname = realname
if not account == "0":
target.identified_account = account
target.identified_account_id = event["server"].get_user(account
).get_id()
target.account = account
else:
target.identified_account = None
target.account = None
events.on("received.whox").call(server=event["server"],
user=target)
def _nick_in_use(server):
new_nick = "%s|" % server.connection_params.nickname

View file

@ -39,6 +39,8 @@ def nick(events, event):
new_nickname = event["line"].args.get(0)
user = event["server"].get_user(event["line"].source.nickname)
old_nickname = user.nickname
user.set_nickname(new_nickname)
event["server"].change_user_nickname(old_nickname, new_nickname)
if not event["server"].is_own_nickname(event["line"].source.nickname):
events.on("received.nick").call(new_nickname=new_nickname,
@ -48,9 +50,6 @@ def nick(events, event):
new_nickname=new_nickname, old_nickname=old_nickname)
event["server"].set_own_nickname(new_nickname)
user.set_nickname(new_nickname)
event["server"].change_user_nickname(old_nickname, new_nickname)
def away(events, event):
user = event["server"].get_user(event["line"].source.nickname)
message = event["line"].args.get(0)
@ -94,13 +93,10 @@ def account(events, event):
user = event["server"].get_user(event["line"].source.nickname)
if not event["line"].args[0] == "*":
user.identified_account = event["line"].args[0]
user.identified_account_id = event["server"].get_user(
event["line"].args[0]).get_id()
user.account = event["line"].args[0]
events.on("received.account.login").call(user=user,
server=event["server"], account=event["line"].args[0])
else:
user.identified_account = None
user.identified_account_id = None
user.account = None
events.on("received.account.logout").call(user=user,
server=event["server"])

View file

@ -1,38 +0,0 @@
# Permissions
## Adding an admin user
This is a little complex at the moment but it will get easier some time soon.
### Registering user
Join a channel that BitBot is in (he'll automatically join #bitbot with default
configuration) and then type
> /msg <botnick> register <password>
### Give * permission
The `*` permission is a special permission that gives you completely unfettered
access to all of BitBot's functions.
On IRC, send this to BitBot and take note of the ID response
> /msg <botnick> myid
Then take that ID and open the database in sqlite3 (default database location is
`databases/bot.db`
> $ sqlite3 databases/bot.db
And then insert your `*` permission
> INSERT INTO user_settings VALUES (<id>, 'permissions', '["*"]');
(where `<id>` is the response from the `myid` command)
### Authenticating
To authenticate yourself as your admin user, use the following command
> /msg &lt;botnick> identify &lt;password>

View file

@ -1,52 +1,20 @@
#--depends-on commands
#--depends-on config
import base64, binascii, os
import scrypt
from src import ModuleManager, utils
REQUIRES_IDENTIFY = "You need to be identified to use that command"
REQUIRES_IDENTIFY_INTERNAL = ("You need to be identified to use that command "
"(/msg %s register | /msg %s identify)")
HOSTMASKS_SETTING = "hostmask-account"
NO_PERMISSION = "You do not have permission to do that"
@utils.export("serverset", utils.OptionsSetting(["internal", "ircv3-account"],
"identity-mechanism", "Set the identity mechanism for this server"))
class Module(ModuleManager.BaseModule):
@utils.hook("new.user")
def new_user(self, event):
self._logout(event["user"])
event["user"].admin_master = False
@utils.hook("new.server")
def new_server(self, event):
hostmasks = {}
def _master_password(self):
master_password = self._random_password()
hash, salt = self._make_hash(master_password)
self.bot.set_setting("master-password", [hash, salt])
return master_password
@utils.hook("control.master-password")
def command_line(self, event):
master_password = self._master_password()
return "One-time master password: %s" % master_password
@utils.hook("received.command.masterpassword", private_only=True)
def master_password(self, event):
"""
:permission: master-password
"""
master_password = self._master_password()
event["stdout"].write("One-time master password: %s" %
master_password)
@utils.hook("received.part")
def on_part(self, event):
if len(event["user"].channels) == 0 and event["user"
].identified_account_override:
event["user"].send_notice("You no longer share any channels "
"with me so you have been signed out")
def _get_hash(self, server, account):
hash, salt = server.get_user(account).get_setting("authentication",
(None, None))
return hash, salt
for account, user_hostmasks in event["server"].get_all_user_settings(
HOSTMASKS_SETTING):
for hostmask in user_hostmasks:
hostmasks[hostmask] = account
event["server"]._hostmasks = hostmasks
def _make_salt(self):
return base64.b64encode(os.urandom(64)).decode("utf8")
@ -59,46 +27,135 @@ class Module(ModuleManager.BaseModule):
hash = base64.b64encode(scrypt.hash(password, salt)).decode("utf8")
return hash, salt
def _identified(self, server, user, account):
user.identified_account_override = account
user.identified_account_id_override = server.get_user(account).get_id()
def _logout(self, user):
user.identified_account_override = None
user.identified_account_id_override = None
@utils.hook("received.command.masterlogin", private_only=True, min_args=1)
def master_login(self, event):
saved_hash, saved_salt = self.bot.get_setting("master-password",
def _get_hash(self, server, account):
hash, salt = server.get_user(account).get_setting("authentication",
(None, None))
return hash, salt
if saved_hash and saved_salt:
given_hash, _ = self._make_hash(event["args"], saved_salt)
if utils.security.constant_time_compare(given_hash, saved_hash):
self.bot.del_setting("master-password")
event["user"].admin_master = True
event["stdout"].write("Master login successful")
return
event["stderr"].write("Master login failed")
def _has_identified(self, server, user, account):
user._id_override = server.get_user_id(account)
def _is_identified(self, user):
return not user._id_override == None
def _signout(self, user):
user._id_override = None
def _find_hostmask(self, server, user):
user_hostmask = user.hostmask()
for hostmask in server._hostmasks.keys():
if utils.irc.hostmask_match(user_hostmask, hostmask):
return (hostmask, server._hostmasks[hostmask])
def _specific_hostmask(self, server, hostmask, account):
for user in server.users.values():
if utils.irc.hostmask_match(user.hostmask(), hostmask):
if account == None:
user._hostmask_account = None
self._signout(user)
else:
user._hostmask_account = (hostmask, account)
self._has_identified(server, user, account)
def _account_name(self, user):
if not user.account == None:
return user.account
elif not user._account_override == None:
return user._account_override
elif not user._hostmask_account == None:
return user._hostmask_account[1]
@utils.hook("new.user")
def new_user(self, event):
event["user"]._hostmask_account = None
event["user"]._account_override = None
def _set_hostmask(self, server, user):
account = self._find_hostmask(server, user)
if not account == None:
hostmask, account = account
user._hostmask_account = (hostmask, account)
self._has_identified(server, user, account)
@utils.hook("received.chghost")
@utils.hook("received.nick")
def chghost(self, event):
if not self._is_identified(event["user"]):
self._set_hostmask(event["server"], event["user"])
@utils.hook("received.whox")
@utils.hook("received.account")
@utils.hook("received.account.login")
@utils.hook("received.account.logout")
@utils.hook("received.join")
def check_account(self, event):
if not self._is_identified(event["user"]):
if event["user"].account:
self._has_identified(event["server"], event["user"],
event["user"].account)
else:
self._set_hostmask(event["server"], event["user"])
def _get_permissions(self, user):
if self._is_identified(user):
return user.get_setting("permissions", [])
return []
def _has_permission(self, user, permission):
permissions = self._get_permissions(user)
if permission in permissions:
return True
else:
permission_parts = permission.split(".")
for user_permission in permissions:
user_permission_parts = user_permission.split(".")
for i, part in enumerate(permission_parts):
last = i==(len(permission_parts)-1)
user_last = i==(len(user_permission_parts)-1)
if not permission_parts[i] == user_permission_parts[i]:
if user_permission_parts == "*" and user_last:
return True
else:
break
else:
if last and user_last:
return True
return False
@utils.hook("received.command.mypermissions")
@utils.kwarg("authenticated", True)
def my_permissions(self, event):
"""
:help: Show your permissions
"""
permissions = event["user"].get_setting("permissions", [])
event["stdout"].write("Your permissions: %s" % ", ".join(permissions))
@utils.hook("received.command.register", private_only=True, min_args=1)
@utils.kwarg("min_args", 1)
@utils.kwarg("private_only", True)
@utils.kwarg("help", "Register your nickname")
@utils.kwarg("usage", "<password>")
def register(self, event):
hash, salt = self._get_hash(event["server"], event["user"].nickname)
if not hash and not salt:
password = event["args"]
hash, salt = self._make_hash(password)
event["user"].set_setting("authentication", [hash, salt])
self._has_identified(event["server"], event["user"],
event["user"].nickname)
event["stdout"].write("Nickname registered successfully")
else:
event["stderr"].write("This nickname is already registered")
@utils.hook("received.command.identify", private_only=True, min_args=1)
@utils.kwarg("min_args", 1)
@utils.kwarg("private_only", True)
@utils.kwarg("help", "Identify for your current nickname")
@utils.kwarg("usage", "[account] <password>")
def identify(self, event):
"""
:help: Identify yourself
:usage: [account] <password>
"""
identity_mechanism = event["server"].get_setting("identity-mechanism",
"internal")
if not identity_mechanism == "internal":
raise utils.EventError("The 'identify' command isn't available "
"on this network")
if not event["user"].channels:
raise utils.EventError("You must share at least one channel "
"with me before you can identify")
if not event["user"].identified_account_override:
if not self._is_identified(event["user"]):
if len(event["args_split"]) > 1:
account = event["args_split"][0]
password = " ".join(event["args_split"][1:])
@ -110,9 +167,11 @@ class Module(ModuleManager.BaseModule):
if hash and salt:
attempt, _ = self._make_hash(password, salt)
if utils.security.constant_time_compare(attempt, hash):
self._identified(event["server"], event["user"], account)
event["user"]._account_override = account
self._has_identified(event["server"], event["user"], account)
event["stdout"].write("Correct password, you have "
"been identified as '%s'." % account)
"been identified as %s." % account)
self.events.on("internal.identified").call(
user=event["user"])
else:
@ -122,179 +181,118 @@ class Module(ModuleManager.BaseModule):
event["stderr"].write("Account '%s' is not registered" %
account)
else:
event["stderr"].write("You are already identified")
event["stderr"].write("You are already identified as %s" %
self._account_name(event["user"]))
@utils.hook("received.command.register", private_only=True, min_args=1)
def register(self, event):
"""
:help: Register yourself
:usage: <password>
"""
identity_mechanism = event["server"].get_setting("identity-mechanism",
"internal")
if not identity_mechanism == "internal":
raise utils.EventError("The 'identify' command isn't available "
"on this network")
@utils.hook("received.command.permission")
@utils.kwarg("min_args", 2)
@utils.kwarg("usage", "list <account>")
@utils.kwarg("usage", "clear <account>")
@utils.kwarg("usage", "add <account> <permission>")
@utils.kwarg("usage", "remove <account> <permission>")
@utils.kwarg("permission", "permissions.change")
def permission(self, event):
subcommand = event["args_split"][0].lower()
account = event["args_split"][1]
target_user = event["server"].get_user(account)
hash, salt = self._get_hash(event["server"], event["user"].nickname)
if not hash and not salt:
password = event["args"]
hash, salt = self._make_hash(password)
event["user"].set_setting("authentication", [hash, salt])
self._identified(event["server"], event["user"],
event["user"].nickname)
event["stdout"].write("Nickname registered successfully")
if subcommand == "list":
event["stdout"].write("Permissions for %s: %s" % (
account, ", ".join(self._get_permissions(target_user))))
elif subcommand == "clear":
if not self._get_permissions(target_user):
raise utils.EventError("%s has no permissions" % account)
target_user.del_setting("permissions")
event["stdout"].write("Cleared permissions for %s" % account)
else:
event["stderr"].write("This nickname is already registered")
permissions = event["args_split"][2:]
if not permissions:
raise utils.EventError("Please provide at least 1 permission")
user_permissions = self._get_permissions(target_user)
@utils.hook("received.command.setpassword", authenticated=True, min_args=1)
def set_password(self, event):
"""
:help: Change your password
:usage: <password>
"""
hash, salt = self._make_hash(event["args"])
event["user"].set_setting("authentication", [hash, salt])
event["stdout"].write("Set your password")
if subcommand == "add":
new = list(set(permissions)-set(user_permissions))
if not new:
raise utils.EventError("No new permissions to give")
target_user.set_setting("permissions", user_permissions+new)
event["stdout"].write("Gave %s new permissions: %s" %
(account, ", ".join(new)))
elif subcommand == "remove":
permissions_set = set(permissions)
user_permissions_set = set(user_permissions)
removed = list(user_permissions_set&permissions_set)
if not (user_permissions_set & permissions_set):
raise utils.EventError("New permissions to remove")
change = list(user_permissions_set - permissions_set)
@utils.hook("received.command.logout", private_only=True)
def logout(self, event):
"""
:help: Logout from your identified account
"""
if event["user"].identified_account_override:
self._logout(event["user"])
event["stdout"].write("You have been logged out")
else:
event["stderr"].write("You are not logged in")
@utils.hook("received.command.resetpassword", private_only=True,
min_args=2)
def reset_password(self, event):
"""
:help: Reset a given user's password
:usage: <nickname> <password>
:permission: resetpassword
"""
target = event["server"].get_user(event["args_split"][0])
password = " ".join(event["args_split"][1:])
registered = target.get_setting("authentication", None)
if registered == None:
event["stderr"].write("'%s' isn't registered" % target.nickname)
else:
hash, salt = self._make_hash(password)
target.set_setting("authentication", [hash, salt])
event["stdout"].write("Reset password for '%s'" %
target.nickname)
def _check_command(self, event, permission, authenticated):
if event["user"].admin_master and (permission or authenticated):
return utils.consts.PERMISSION_FORCE_SUCCESS, None
identity_mechanism = event["server"].get_setting("identity-mechanism",
"internal")
identified_account = None
if identity_mechanism == "internal":
identified_account = event["user"].identified_account_override
elif identity_mechanism == "ircv3-account":
identified_account = (event["user"].identified_account or
event["tags"].get("account", None))
identified_user = None
permissions = []
if identified_account:
identified_user = event["server"].get_user(identified_account)
permissions = identified_user.get_setting("permissions", [])
if permission:
has_permission = permission and (
permission in permissions or "*" in permissions)
if not identified_account or not has_permission:
return (utils.consts.PERMISSION_ERROR,
"You do not have permission to do that")
else:
return utils.consts.PERMISSION_FORCE_SUCCESS, None
elif authenticated:
if not identified_account:
error = None
if identity_mechanism == "internal":
error = REQUIRES_IDENTIFY_INTERNAL % (
event["server"].nickname, event["server"].nickname)
if not change:
target_user.del_setting("permissions")
else:
error = REQUIRES_IDENTIFY
return utils.consts.PERMISSION_ERROR, error
target_user.set_setting("permissions", change)
event["stdout"].write("Removed permissions from %s: %s" %
(account, ", ".join(change)))
else:
return utils.consts.PERMISSION_FORCE_SUCCESS, None
raise utils.EventError("Unknown subcommand %s" % subcommand)
@utils.hook("received.command.hostmask")
@utils.kwarg("min_args", 1)
@utils.kwarg("authenticated", True)
@utils.kwarg("usage", "list")
@utils.kwarg("usage", "add [hostmask]")
@utils.kwarg("usage", "remove [hostmask]")
def hostmask(self, event):
subcommand = event["args_split"][0].lower()
hostmasks = event["user"].get_setting(HOSTMASKS_SETTING, [])
if subcommand == "list":
event["stdout"].write("Your hostmasks: %s" % ", ".join(hostmasks))
else:
if event["args_split"][1:]:
hostmask = event["args_split"][1]
else:
hostmask = "*!%s" % event["user"].userhost()
account = self._account_name(event["user"])
if subcommand == "add":
if hostmask in hostmasks:
raise utils.EventError(
"Hostmask %s is already on your account" % hostmask)
hostmasks.append(hostmask)
event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
self._specific_hostmask(event["server"], hostmask, account)
event["server"]._hostmasks[hostmask] = account
event["stdout"].write("Added %s to your hostmasks" % hostmask)
elif subcommand == "remove":
if not hostmask in hostmasks:
raise utils.EventError("Hostmask %s is not on your account"
% hostmask)
while hostmask in hostmasks:
hostmasks.remove(hostmask)
event["user"].set_setting(HOSTMASKS_SETTING, hostmasks)
self._specific_hostmask(event["server"], hostmask, None)
if hostmask in event["server"]._hostmasks:
del event["server"]._hostmasks[hostmask]
event["stdout"].write("Removed %s from your hostmasks"
% hostmask)
else:
raise utils.EventError("Unknown subcommand %s" % subcommand)
@utils.hook("preprocess.command")
def preprocess_command(self, event):
allowed = None
permission = event["hook"].get_kwarg("permission", None)
authenticated = event["hook"].get_kwarg("authenticated", False)
return self._check_command(event, permission, authenticated)
if not permission == None:
allowed = self._has_permission(event["user"], permission)
elif not authenticated == None:
allowed = self._is_identified(event["user"])
@utils.hook("check.command.permission")
def check_command(self, event):
return self._check_command(event, event["request_args"][0], False)
@utils.hook("received.command.mypermissions", authenticated=True)
def my_permissions(self, event):
"""
:help: Show your permissions
"""
permissions = event["user"].get_setting("permissions", [])
event["stdout"].write("Your permissions: %s" % ", ".join(permissions))
def _get_user_details(self, server, nickname):
target = server.get_user(nickname)
registered = bool(target.get_setting("authentication", None))
permissions = target.get_setting("permissions", [])
return [target, registered, permissions]
@utils.hook("received.command.givepermission", min_args=2)
def give_permission(self, event):
"""
:help: Give a given permission to a given user
:usage: <nickname> <permission>
:permission: givepermission
"""
permission = event["args_split"][1].lower()
target, registered, permissions = self._get_user_details(
event["server"], event["args_split"][0])
if target.get_identified_account() == None:
raise utils.EventError("%s isn't registered" % target.nickname)
if permission in permissions:
event["stderr"].write("%s already has permission '%s'" % (
target.nickname, permission))
else:
permissions.append(permission)
target.set_setting("permissions", permissions)
event["stdout"].write("Gave permission '%s' to %s" % (
permission, target.nickname))
@utils.hook("received.command.removepermission", min_args=2)
def remove_permission(self, event):
"""
:help: Remove a given permission from a given user
:usage: <nickname> <permission>
:permission: removepermission
"""
permission = event["args_split"][1].lower()
target, registered, permissions = self._get_user_details(
event["server"], event["args_split"][0])
if target.identified_account == None:
raise utils.EventError("%s isn't registered" % target.nickname)
if permission not in permissions:
event["stderr"].write("%s doesn't have permission '%s'" % (
target.nickname, permission))
else:
permissions.remove(permission)
if not permissions:
target.del_setting("permissions")
if not allowed == None:
if allowed:
return utils.consts.PERMISSION_FORCE_SUCCESS, None
else:
target.set_setting("permissions", permissions)
event["stdout"].write("Removed permission '%s' from %s" % (
permission, target.nickname))
return utils.consts.PERMISSION_ERROR, NO_PERMISSION

View file

@ -11,17 +11,15 @@ class User(IRCObject.Object):
self.server = server
self.set_nickname(nickname)
self._id = id
self._id_override: typing.Optional[int] = None
self.username: typing.Optional[str] = None
self.hostname: typing.Optional[str] = None
self.realname: typing.Optional[str] = None
self.bot = bot
self.channels: typing.Set[IRCChannel.Channel] = set([])
self.identified_account = None
self.identified_account_override = None
self.account = None
self.identified_account_id = None
self.identified_account_id_override = None
self.away = False
self.away_message: typing.Optional[str] = None
@ -42,10 +40,7 @@ class User(IRCObject.Object):
return None
def get_id(self)-> int:
return (self.identified_account_id_override or
self.identified_account_id or self._id)
def get_identified_account(self) -> typing.Optional[str]:
return (self.identified_account_override or self.identified_account)
return self._id_override or self._id
def set_nickname(self, nickname: str):
self.nickname = nickname