From 57760a4ad76e5b224cafe1be38e6c0bb14f99bf3 Mon Sep 17 00:00:00 2001 From: jesopo Date: Thu, 12 Dec 2019 13:14:14 +0000 Subject: [PATCH] add dnsbl.py --- modules/dnsbl.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 modules/dnsbl.py diff --git a/modules/dnsbl.py b/modules/dnsbl.py new file mode 100644 index 00000000..d2e3135d --- /dev/null +++ b/modules/dnsbl.py @@ -0,0 +1,50 @@ +import ipaddress +from src import ModuleManager, utils +import dns.resolver + +DEFAULT_LISTS = [ + "rbl.efnetrbl.org", + "zen.spamhaus.org" +] + +class Module(ModuleManager.BaseModule): + @utils.hook("received.command.dnsbl") + def dnsbl(self, event): + args = event["args_split"] + + lists = [] + for i, arg in reversed(list(enumerate(args))): + if arg[0] == "@": + lists.insert(args.pop(i)) + lists = lists or DEFAULT_LISTS + + address = args[0] + failed = self._check_lists(lists, address) + if failed: + event["stderr"].write("%s failed for lists: %s" % + (address, ", ".join(failed))) + else: + event["stdout"].write("%s not found in blacklists" % address) + + def _check_lists(self, lists, address): + address_obj = ipaddress.ip_address(address) + + if address_obj.version == 6: + address = reversed(address_obj.exploded.replace(":", "")) + else: + address = reversed(address.split(".")) + address = ".".join(address) + + failed = [] + for list in lists: + if not self._check_list(list, address): + failed.append(list) + return failed + + def _check_list(self, list, address): + list_address = "%s.%s" % (address, list) + try: + dns.resolver.query(list_address) + except dns.resolver.NXDOMAIN: + return False + return True