change encrypted channel logs to use RSA -> AES (CBC)

closes #248
This commit is contained in:
jesopo 2020-02-24 13:14:05 +00:00
parent 9d120dcd2c
commit 6535ec731c
3 changed files with 73 additions and 27 deletions

View file

@ -15,13 +15,26 @@ import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import padding as a_padding
def a_decrypt(key, data):
out = key.decrypt(base64.b64decode(data), padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
def rsa_decrypt(key, data):
return key.decrypt(base64.b64decode(data), a_padding.OAEP(
mgf=a_padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(), label=None))
return out.decode("utf8")
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
def aes_decrypt(key: bytes, data_str: str):
data_bytes = base64.b64decode(data_str)
iv, data_bytes = data_bytes[:16], data_bytes[16:]
decryptor = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend()).decryptor()
plain = decryptor.update(data_bytes)+decryptor.finalize()
unpadder = padding.PKCS7(256).unpadder()
return (unpadder.update(plain)+unpadder.finalize()).decode("utf8")
with open(args.key, "rb") as key_file:
key_content = key_file.read()
@ -32,7 +45,17 @@ with open(args.log) as log_file:
lines = log_file.read().split("\n")
lines = filter(None, lines)
symm_key = None
for line in lines:
printable = None
if line[0] == "\x02":
line = a_decrypt(key, line[1:])
print(line)
printable = rsa_decrypt(key, line[1:]).decode("utf8")
elif line[0] == "\x03":
symm_key = rsa_decrypt(key, line[1:])
elif line[0] == "\x04":
printable = aes_decrypt(symm_key, line[1:])
else:
printable = line
if not printable == None:
print(printable)

View file

@ -24,18 +24,35 @@ class Module(ModuleManager.BaseModule):
# forbidden in channel names.
sanitised_name = channel_name.replace(os.path.sep, ",")
return self.data_directory("%s/%s.log" % (server_name, sanitised_name))
def _write_line(self, channel, line):
channel.__log_file.write("%s\n" % line)
def _write(self, channel, filename, key, line):
if not hasattr(channel, "__log_file"):
channel.__log_file = open(filename, "a")
channel.__log_rsa = None
channel.__log_aes = None
if key and not key == channel.__log_rsa:
aes_key = utils.security.aes_key()
channel.__log_rsa = key
channel.__log_aes = aes_key
aes_key_line = utils.security.rsa_encrypt(key, aes_key)
self._write_line(channel, "\x03%s" % aes_key_line)
if not channel.__log_aes == None:
line = "\x04%s" % utils.security.aes_encrypt(
channel.__log_aes, line)
self._write_line(channel, line)
def _log(self, server, channel, line):
if self._enabled(server, channel):
filename = self._file(str(server), str(channel))
timestamp = utils.datetime.format.datetime_human(
datetime.datetime.now())
log_line = "%s %s" % (timestamp, line)
if "log-key" in self.bot.config:
log_line = "\x02%s" % utils.security.a_encrypt(
self.bot.config["log-key"], log_line)
with open(self._file(str(server), str(channel)), "a") as log_file:
log_file.write("%s\n" % log_line)
self._write(channel, filename, self.bot.config.get("log-key"),
log_line)
@utils.hook("formatted.message.channel")
@utils.hook("formatted.notice.channel")

View file

@ -39,24 +39,30 @@ def hash_verify(salt: str, data: str, compare: str):
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import padding as a_padding
def a_encrypt(key_filename: str, data: str):
def rsa_encrypt(key_filename: str, data: bytes) -> str:
with open(key_filename, "rb") as key_file:
key_content = key_file.read()
key = serialization.load_pem_public_key(
key_content, backend=default_backend())
out = key.encrypt(data.encode("utf8"), padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
out = key.encrypt(data, a_padding.OAEP(
mgf=a_padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(), label=None))
return base64.b64encode(out).decode("iso-8859-1")
def a_decrypt(key_filename: str, data: str):
with open(key_filename, "rb") as key_file:
key_content = key_file.read()
key = serialization.load_pem_private_key(
key_content, password=None, backend=default_backend())
out = key.decrypt(base64.b64decode(data), padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(), label=None))
return out.decode("utf8")
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
def aes_key() -> bytes:
return os.urandom(32)
def aes_encrypt(key: bytes, data: str) -> str:
iv = os.urandom(16)
padder = padding.PKCS7(256).padder()
data_bytes = padder.update(data.encode("utf8"))+padder.finalize()
encryptor = Cipher(algorithms.AES(key), modes.CBC(iv),
backend=default_backend()).encryptor()
ct = encryptor.update(data_bytes)+encryptor.finalize()
return base64.b64encode(iv+ct).decode("latin-1")