From ada63855c063c7294cb01b4c21a36315ef1c6784 Mon Sep 17 00:00:00 2001 From: Corbin Date: Fri, 10 Nov 2023 11:55:54 -0500 Subject: [PATCH] Refactor dkim_selctors, add app to output, more. --- email_dns.py | 185 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 112 insertions(+), 73 deletions(-) diff --git a/email_dns.py b/email_dns.py index 4ef92ac..21ed058 100755 --- a/email_dns.py +++ b/email_dns.py @@ -1,118 +1,157 @@ #!/usr/bin/python3 import sys +from dns import resolver import re import json -import dns.resolver -import asyncio -dkim_selectors = [ # ._domainkey. - 'dkim', - 'google', # Google - 'selector', 'selector1', 'selector2', 'selector3', # Microsoft - 'fm1', 'fm2', 'fm3', 'mesmtp', # FastMail - 'protonmail', 'protonmail1', 'protonmail2', # ProtonMail - 'sg', 's1', 'sg2', 's2', 's3', # SendGrid - 'ctct1', 'ctct2', # Constant Contact - 'k1', # MailChimp - 'mandrill', # MailChimp Add-on - 'pic', 'smtp', # Mailgun (Very generic, thank you, Mailgun) - 'bh', # Bullhorn - 'cm' # Campaign Monitor -] +dkim_selectors = { # ._domainkey. + 'none': [ + 'dkim' + ], + 'google': [ + 'google' + ], + 'exchange': [ + 'selector','selector1', 'selector2', 'selector3' + ], + 'fastmail': [ + 'fm1', 'fm2', 'fm3', 'mesmtp' + ], + 'protonmail': [ + 'protonmail', 'protonmail1', 'protonmail2' + ], + 'sendgrid': [ + 'sg', 's1', 'sg2', 's2', 's3' + ], + 'constant-contact': [ + 'ctct1', 'ctct2' + ], + # MailChimp - Usually CNAME to dkimX.mcsv.net + 'mailchimp': [ + 'k1', 'k2', 'k3' + ], + # MailChimp Add-on + 'mandrill': [ + 'mandrill' + ], + 'mailgun': [ + 'pic', 'smtp', # Mailgun (Very generic, thank you, Mailgun) + ], + 'bullhorn': [ + 'bh' + ], + 'campaign-monitor': [ + 'cm' + ], + 'presswise': [ + 'presswise' + ] +} -domains = [] +def process_args(args: list) -> list: + domains = [] + args.pop(0) -def safe_resolve(name: str, type: str) -> dns.resolver.Answer: + for arg in args: + domains.append(Domain(name=arg)) + + return domains + +def safe_resolve(name: str, type: str) -> resolver.Answer: try: - return dns.resolver.resolve(name, type) - except dns.resolver.NoAnswer: + answer = resolver.resolve(name, type) + return answer + # We don't necessarily care if the name doesn't exist + except resolver.NXDOMAIN: return None - except dns.resolver.NXDOMAIN: - return None - except dns.exception.Timeout: + except resolver.NoAnswer: return None class Domain: - def __init__(self, name: str, mx: str = [], spf: str = None, dkim: list = [], dmarc: str = None) -> None: + # def __init__(self, name: str, mx: str = [], spf: str = None, dkim: list = [], dmarc: str = None) -> None: + def __init__(self, name: str) -> None: self.name = name - self.mx = mx - self.spf = spf - self.dkim = dkim - self.dmarc = dmarc + self.mx = [] + self.spf = None + self.dkim = [] + self.dmarc = None def resolve_mx(self) -> None: + # Attempt to resolve the MX record for the domain answer = safe_resolve(self.name, 'MX') if answer: for a in answer: + # Loop through each MX record, record its data and priority self.mx.append({ 'exchanger': re.sub(r'^\d+\s', '', str(a)), - 'preference': re.search(r'^(\d+)', str(a)).groups()[0] + 'priority': re.search(r'^(\d+)', str(a)).groups()[0] }) - def get_mx(self) -> list: - return self.mx - - # TODO: Recurse for each 'include:' in SPF def resolve_spf(self) -> None: answer = safe_resolve(self.name, 'TXT') if answer: for a in answer: if str(a).lower().find('v=spf1') != -1: self.spf = str(a).replace('"', '') - matches = re.search(r'(?:include:)(.*)(?=\s)', str(a).lower()) - domains = [] - if matches: - for match in matches: - domains.append(Domain(match.group(1))) - return domains + # TODO: Recurse through includes + # matches = re.search(r'(?:include:)(.*)(?=\s)', str(a).lower()) + # domains = set() + # if matches: + # for match in matches: + # domains.add(Domain(match.group(1))) + # return domains break else: self.spf = None - - def get_spf(self) -> str: - return self.spf def resolve_dkim(self) -> None: - for s in dkim_selectors: - # Search for CNAME DKIM records - answer = safe_resolve(f'{s}._domainkey.{self.name}', 'CNAME') + # Loop through our pre-defined selectors list, an attempt to resolve them all + for app, selectors in dkim_selectors.items(): + for s in selectors: + # Search for CNAME DKIM records + answer = safe_resolve(f'{s}._domainkey.{self.name}', 'CNAME') + if answer: + self.dkim.append({'application': app, 'selector': s, 'type': 'CNAME', 'value': str(answer[0]).replace('"', '')}) + # This is a CNAME, we don't have to check if it's a TXT + continue + # Search for TXT DKIM records + answer = safe_resolve(f'{s}._domainkey.{self.name}', 'TXT') + if answer: + if (str(answer[0]).lower().find('v=dkim1') != -1) or (str(answer[0]).lower().find('k=rsa') != -1): + self.dkim.append({'application': app, 'selector': s, 'type': 'TXT', 'value': str(answer[0]).replace('"', '')}) + # Check for TXT DKIM record at root + answer = safe_resolve(self.name, 'TXT') if answer: - self.dkim.append({'type': 'CNAME', 'value': str(answer[0]).replace('"', '')}) - continue - # Search for TXT DKIM records - answer = safe_resolve(f'{s}._domainkey.{self.name}', 'TXT') - if answer: - if (str(answer[0]).lower().find('v=dkim1') != -1) or (str(answer[0]).lower().find('k=rsa') != -1): - self.dkim.append({'type': 'TXT', 'value': str(answer[0]).replace('"', '')}) - # Check for TXT DKIM record - answer = safe_resolve(self.name, 'TXT') - if answer: - for a in answer: - if (str(a).lower().find('v=dkim1') != -1) or (str(a).lower().find('k=rsa') != -1): - self.dkim.append({'type': 'TXT', 'value': str(answer[0]).replace('"', '')}) - - def get_dkim(self) -> list: - return self.dkim + for a in answer: + # Loop through all root TXT records + if (str(a).lower().find('v=dkim1') != -1) or (str(a).lower().find('k=rsa') != -1): + # If the record data includes `v=dkim1` then record it + self.dkim.append({'application': app, 'selector': None, 'type': 'TXT', 'value': str(answer[0]).replace('"', '')}) def resolve_dmarc(self) -> None: + # Attempt to resolve the DMARC record for the domain answer = safe_resolve(f'_dmarc.{self.name}', 'TXT') if answer: self.dmarc = str(answer[0]).replace('"', '') else: self.dmarc = None - - def get_dmarc(self) -> str: - return self.dmarc - def resolve_email_records(self) -> None: + def resolve(self) -> None: + # Resolve all records self.resolve_mx() self.resolve_spf() - self.resolve_dkim() + self.resolve_dkim() self.resolve_dmarc() + + +def main(): + domains = process_args(sys.argv) + + for domain in domains: + domain.resolve() + + # Pretty print Domain dict as JSON + print(json.dumps([domain.__dict__ for domain in domains], indent=4)) + if __name__ == "__main__": - - for arg in sys.argv[1:]: - domain = Domain(arg) - domain.resolve_email_records() - domains.append(domain) - - print(json.dumps([domain.__dict__ for domain in domains], indent=4)) \ No newline at end of file + + main() \ No newline at end of file