refactor dnsbl module, show reason for positive detection when possible

This commit is contained in:
jesopo 2019-12-12 14:13:17 +00:00
parent 39a4e97901
commit fbe1acf220
2 changed files with 55 additions and 12 deletions

View file

@ -1,26 +1,29 @@
import ipaddress import ipaddress
from src import ModuleManager, utils from src import ModuleManager, utils
import dns.resolver import dns.resolver
from . import lists as _lists
DEFAULT_LISTS = [
"rbl.efnetrbl.org",
"zen.spamhaus.org"
]
class Module(ModuleManager.BaseModule): class Module(ModuleManager.BaseModule):
@utils.hook("received.command.dnsbl") @utils.hook("received.command.dnsbl")
def dnsbl(self, event): def dnsbl(self, event):
args = event["args_split"] args = event["args_split"]
default_lists = _lists.default_lists()
lists = [] lists = []
for i, arg in reversed(list(enumerate(args))): for i, arg in reversed(list(enumerate(args))):
if arg[0] == "@": if arg[0] == "@":
lists.insert(0, args.pop(i)) hostname = args.pop(i)
lists = lists or DEFAULT_LISTS if hostname in default_lists:
lists.insert(0, default_lists[hostname])
else:
lists.insert(0, lists.DNSBL(hostname))
lists = lists or list(default_lists.values())
address = args[0] address = args[0]
failed = self._check_lists(lists, address) failed = self._check_lists(lists, address)
if failed: if failed:
failed = ["%s (%s)" % item for item in failed]
event["stderr"].write("%s failed for lists: %s" % event["stderr"].write("%s failed for lists: %s" %
(address, ", ".join(failed))) (address, ", ".join(failed)))
else: else:
@ -37,14 +40,14 @@ class Module(ModuleManager.BaseModule):
failed = [] failed = []
for list in lists: for list in lists:
if not self._check_list(list, address): record = self._check_list(list.hostname, address)
failed.append(list) if not record == None:
failed.append((list.hostname, list.process(record)))
return failed return failed
def _check_list(self, list, address): def _check_list(self, list, address):
list_address = "%s.%s" % (address, list) list_address = "%s.%s" % (address, list)
try: try:
dns.resolver.query(list_address) return dns.resolver.query(list_address, "A")[0].to_text()
except dns.resolver.NXDOMAIN: except dns.resolver.NXDOMAIN:
return True return None
return False

40
modules/dnsbl/lists.py Normal file
View file

@ -0,0 +1,40 @@
import collections
class DNSBL(object):
def __init__(self, hostname=None):
if not hostname == None:
self.hostname = hostname
def process(self, result: str):
return "unknown"
class ZenSpamhaus(DNSBL):
hostname = "zen.spamhaus.org"
def process(self, result):
result = result.rsplit(".", 1)[1]
if result in ["2", "3", "9"]:
return "spam"
elif result in ["4", "5", "6", "7"]:
return "exploits"
class EFNetRBL(DNSBL):
hostname = "rbl.efnetrbl.org"
SPAMTRAP = ["2", "3"]
def process(self, result):
result = result.rsplit(".", 1)[1]
if result == "1":
return "proxy"
elif result in self.SPAMTRAP:
return "spamtap"
elif result == "4":
return "tor"
elif result == "5":
return "flooding"
DEFAULT_LISTS = [
ZenSpamhaus(),
EFNetRBL()
]
def default_lists():
return collections.OrderedDict(
(dnsbl.hostname, dnsbl) for dnsbl in DEFAULT_LISTS)