Refactor dkim_selctors, add app to output, more.

This commit is contained in:
Corbin 2023-11-10 11:55:54 -05:00
parent df309892ac
commit ada63855c0

View File

@ -1,118 +1,157 @@
#!/usr/bin/python3 #!/usr/bin/python3
import sys import sys
from dns import resolver
import re import re
import json import json
import dns.resolver
import asyncio
dkim_selectors = [ # <selector>._domainkey.<domain> dkim_selectors = { # <selector>._domainkey.<domain>
'dkim', 'none': [
'google', # Google 'dkim'
'selector', 'selector1', 'selector2', 'selector3', # Microsoft ],
'fm1', 'fm2', 'fm3', 'mesmtp', # FastMail 'google': [
'protonmail', 'protonmail1', 'protonmail2', # ProtonMail 'google'
'sg', 's1', 'sg2', 's2', 's3', # SendGrid ],
'ctct1', 'ctct2', # Constant Contact 'exchange': [
'k1', # MailChimp 'selector','selector1', 'selector2', 'selector3'
'mandrill', # MailChimp Add-on ],
'pic', 'smtp', # Mailgun (Very generic, thank you, Mailgun) 'fastmail': [
'bh', # Bullhorn 'fm1', 'fm2', 'fm3', 'mesmtp'
'cm' # Campaign Monitor ],
] 'protonmail': [
'protonmail', 'protonmail1', 'protonmail2'
],
'sendgrid': [
'sg', 's1', 'sg2', 's2', 's3'
],
'constant-contact': [
'ctct1', 'ctct2'
],
# MailChimp - Usually CNAME to dkimX.mcsv.net
'mailchimp': [
'k1', 'k2', 'k3'
],
# MailChimp Add-on
'mandrill': [
'mandrill'
],
'mailgun': [
'pic', 'smtp', # Mailgun (Very generic, thank you, Mailgun)
],
'bullhorn': [
'bh'
],
'campaign-monitor': [
'cm'
],
'presswise': [
'presswise'
]
}
domains = [] def process_args(args: list) -> list:
domains = []
args.pop(0)
def safe_resolve(name: str, type: str) -> dns.resolver.Answer: for arg in args:
domains.append(Domain(name=arg))
return domains
def safe_resolve(name: str, type: str) -> resolver.Answer:
try: try:
return dns.resolver.resolve(name, type) answer = resolver.resolve(name, type)
except dns.resolver.NoAnswer: return answer
# We don't necessarily care if the name doesn't exist
except resolver.NXDOMAIN:
return None return None
except dns.resolver.NXDOMAIN: except resolver.NoAnswer:
return None
except dns.exception.Timeout:
return None return None
class Domain: class Domain:
def __init__(self, name: str, mx: str = [], spf: str = None, dkim: list = [], dmarc: str = None) -> None: # def __init__(self, name: str, mx: str = [], spf: str = None, dkim: list = [], dmarc: str = None) -> None:
def __init__(self, name: str) -> None:
self.name = name self.name = name
self.mx = mx self.mx = []
self.spf = spf self.spf = None
self.dkim = dkim self.dkim = []
self.dmarc = dmarc self.dmarc = None
def resolve_mx(self) -> None: def resolve_mx(self) -> None:
# Attempt to resolve the MX record for the domain
answer = safe_resolve(self.name, 'MX') answer = safe_resolve(self.name, 'MX')
if answer: if answer:
for a in answer: for a in answer:
# Loop through each MX record, record its data and priority
self.mx.append({ self.mx.append({
'exchanger': re.sub(r'^\d+\s', '', str(a)), 'exchanger': re.sub(r'^\d+\s', '', str(a)),
'preference': re.search(r'^(\d+)', str(a)).groups()[0] 'priority': re.search(r'^(\d+)', str(a)).groups()[0]
}) })
def get_mx(self) -> list:
return self.mx
# TODO: Recurse for each 'include:' in SPF
def resolve_spf(self) -> None: def resolve_spf(self) -> None:
answer = safe_resolve(self.name, 'TXT') answer = safe_resolve(self.name, 'TXT')
if answer: if answer:
for a in answer: for a in answer:
if str(a).lower().find('v=spf1') != -1: if str(a).lower().find('v=spf1') != -1:
self.spf = str(a).replace('"', '') self.spf = str(a).replace('"', '')
matches = re.search(r'(?:include:)(.*)(?=\s)', str(a).lower()) # TODO: Recurse through includes
domains = [] # matches = re.search(r'(?:include:)(.*)(?=\s)', str(a).lower())
if matches: # domains = set()
for match in matches: # if matches:
domains.append(Domain(match.group(1))) # for match in matches:
return domains # domains.add(Domain(match.group(1)))
# return domains
break break
else: self.spf = None else: self.spf = None
def get_spf(self) -> str:
return self.spf
def resolve_dkim(self) -> None: def resolve_dkim(self) -> None:
for s in dkim_selectors: # Loop through our pre-defined selectors list, an attempt to resolve them all
# Search for CNAME DKIM records for app, selectors in dkim_selectors.items():
answer = safe_resolve(f'{s}._domainkey.{self.name}', 'CNAME') for s in selectors:
# Search for CNAME DKIM records
answer = safe_resolve(f'{s}._domainkey.{self.name}', 'CNAME')
if answer:
self.dkim.append({'application': app, 'selector': s, 'type': 'CNAME', 'value': str(answer[0]).replace('"', '')})
# This is a CNAME, we don't have to check if it's a TXT
continue
# Search for TXT DKIM records
answer = safe_resolve(f'{s}._domainkey.{self.name}', 'TXT')
if answer:
if (str(answer[0]).lower().find('v=dkim1') != -1) or (str(answer[0]).lower().find('k=rsa') != -1):
self.dkim.append({'application': app, 'selector': s, 'type': 'TXT', 'value': str(answer[0]).replace('"', '')})
# Check for TXT DKIM record at root
answer = safe_resolve(self.name, 'TXT')
if answer: if answer:
self.dkim.append({'type': 'CNAME', 'value': str(answer[0]).replace('"', '')}) for a in answer:
continue # Loop through all root TXT records
# Search for TXT DKIM records if (str(a).lower().find('v=dkim1') != -1) or (str(a).lower().find('k=rsa') != -1):
answer = safe_resolve(f'{s}._domainkey.{self.name}', 'TXT') # If the record data includes `v=dkim1` then record it
if answer: self.dkim.append({'application': app, 'selector': None, 'type': 'TXT', 'value': str(answer[0]).replace('"', '')})
if (str(answer[0]).lower().find('v=dkim1') != -1) or (str(answer[0]).lower().find('k=rsa') != -1):
self.dkim.append({'type': 'TXT', 'value': str(answer[0]).replace('"', '')})
# Check for TXT DKIM record
answer = safe_resolve(self.name, 'TXT')
if answer:
for a in answer:
if (str(a).lower().find('v=dkim1') != -1) or (str(a).lower().find('k=rsa') != -1):
self.dkim.append({'type': 'TXT', 'value': str(answer[0]).replace('"', '')})
def get_dkim(self) -> list:
return self.dkim
def resolve_dmarc(self) -> None: def resolve_dmarc(self) -> None:
# Attempt to resolve the DMARC record for the domain
answer = safe_resolve(f'_dmarc.{self.name}', 'TXT') answer = safe_resolve(f'_dmarc.{self.name}', 'TXT')
if answer: self.dmarc = str(answer[0]).replace('"', '') if answer: self.dmarc = str(answer[0]).replace('"', '')
else: self.dmarc = None else: self.dmarc = None
def get_dmarc(self) -> str:
return self.dmarc
def resolve_email_records(self) -> None: def resolve(self) -> None:
# Resolve all records
self.resolve_mx() self.resolve_mx()
self.resolve_spf() self.resolve_spf()
self.resolve_dkim() self.resolve_dkim()
self.resolve_dmarc() self.resolve_dmarc()
def main():
domains = process_args(sys.argv)
for domain in domains:
domain.resolve()
# Pretty print Domain dict as JSON
print(json.dumps([domain.__dict__ for domain in domains], indent=4))
if __name__ == "__main__": if __name__ == "__main__":
for arg in sys.argv[1:]: main()
domain = Domain(arg)
domain.resolve_email_records()
domains.append(domain)
print(json.dumps([domain.__dict__ for domain in domains], indent=4))