Commit cda60c35 authored by Robert Labudda's avatar Robert Labudda
Browse files

findpgpkey

parent 5893ffda
......@@ -59,6 +59,18 @@ ask you for it.
Requirements: bash, gpg, pinentry
findpgpkey.py
-------------
Python script to obtain PGP keys from an email. The intended use-case is to
have it run from within your mutt or neomutt: pipe the message into
`findpgpkey.py -m` and it will try its best to locate the PGP key in all the
weird places and import it into your keyring.
Requirements: python, gpg
smount
------
......
#!/usr/bin/env python3
"""Find PGP keys from the headers of an email or otherwise
This script is supposed to be run from within mutt or other email programs
to obtain the PGP key of a sender based on the information found in the
header.
The script will try to get information from the header, like the
``OpenPGP`` header, the ``Autocrypt`` header, various non-standard headers
(``X-PGP``, ``X-PGP-Key``, etc).
Then it will look at the ``From`` and ``Return-Path`` to try and obtain the
PGP keys from public sources like WKD, DANE, DNS cert, and other methods that
are supported by your local GnuPG.
And at last the keyservers are queried.
To use this in your mutt configuration you will probably want to bind it to
some key in a macro, like this:
macro index k '<pipe-message>findpgpkey.py -m<enter>'
"""
import argparse
import base64
import sys
import hashlib
import subprocess
import re
import json
import string
from urllib.request import urlopen
from email.parser import BytesParser
ADDRESSPATTERN = re.compile(r'^[^<]*<(.*)>$')
def eprint(*args, **kwargs):
if 'file' not in kwargs:
kwargs['file'] = sys.stderr
return print(*args, **kwargs)
def strip_comment(part):
if part.startswith("(") and ")" in part:
end = part.find(")")
part = part[end+1:]
if part.endswith(")") and "(" in part:
start = part.rfind("(")
part = part[:start]
return part
def clean_up_address(text):
match = ADDRESSPATTERN.match(text)
if match is None:
return text
else:
return match.group(1)
return None
def create_variants(address):
address = clean_up_address(address)
if address is None:
return set()
localpart, domain = address.rsplit('@', 1)
domain = strip_comment(domain)
sender = f'{localpart}@{domain}'
variants = set([sender])
fixed = strip_comment(localpart)
variants.add(f'{fixed}@{domain}')
if '+' in fixed:
fixed = fixed[:fixed.index('+')]
variants.add(f'{fixed}@{domain}')
variants.add(sender)
return variants
def parse_openpgp_header(text):
while "\n " in text:
text = text.replace("\n ", "")
result = {}
while len(text) > 0:
text = text.strip()
if text.startswith('id='):
keyid = ''
idx = 3
while idx < len(text):
ch = text[idx]
idx += 1
if ch == ';':
idx += 1
break
else:
keyid += ch
text = text[idx:].strip()
if len(keyid) in [8, 16, 32, 40]:
result['id'] = keyid
elif text.startswith('url='):
url = ''
idx = 4
quoted = text[idx] == '"'
if quoted:
idx += 1
end_of_url = '"' if quoted else ';'
escaped = False
while idx < len(text):
ch = text[idx]
idx += 1
if escaped:
url += ch
escaped = False
else:
if ch == '\\':
escaped = True
elif ch == end_of_url:
idx += 1
break
else:
url += ch
text = text[idx:].strip()
print(url)
if len(url) > 0:
result['url'] = url
elif text.startswith('preference='):
pref = ''
idx = 11
while idx < len(text):
ch = text[idx]
idx += 1
if ch == ';':
idx += 1
break
else:
pref += ch
pref = pref.lower()
text = text[idx:].strip()
if pref in ['unprotected', 'sign', 'encrypt', 'signencrypt']:
result['preference'] = pref
else:
break
return result
def parse_autocrypt_header(text):
while "\n " in text:
text = text.replace("\n ", "")
result = {}
while len(text) > 0:
text = text.strip()
if text.startswith('addr='):
addr = ''
idx = 5
while idx < len(text):
ch = text[idx]
idx += 1
if ch == ';':
idx += 1
break
else:
addr += ch
text = text[idx:].strip()
if len(addr) > 0:
result['addr'] = addr
elif text.startswith('keydata='):
keydata = ''
idx = 8
while idx < len(text):
ch = text[idx]
idx += 1
if ch == ';':
idx += 1
break
else:
keydata += ch
text = text[idx:].strip()
if len(keydata) > 0:
result['keydata'] = keydata
elif text.startswith('prefer-encrypt='):
pref = ''
idx = 15
while idx < len(text):
ch = text[idx]
idx += 1
if ch == ';':
idx += 1
break
else:
pref += ch
pref = pref.lower()
text = text[idx:].strip()
if pref in ['mutual']:
result['prefer-encrypt'] = pref
else:
break
return result
def gpg_import(keydata):
success = False
with subprocess.Popen(['gpg2', '--import'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as gpg:
gpg.stdin.write(keydata)
gpg.stdin.close()
gpg.wait()
result = gpg.stderr.read()
if gpg.returncode == 0 and result is not None:
output = str(result, 'utf-8')
for line in output.split("\n"):
print(line)
if line.startswith('gpg: Total number processed:'):
_, processed = line.rsplit(' ', 1)
processed = int(processed.strip())
if processed > 0:
success = True
return success
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--address',
action='append',
help='Find public key for this address')
parser.add_argument('-i', '--key-id',
action='append',
help='Find public key for this key ID')
parser.add_argument('-m', '--mail',
action='store_true',
default=False,
help='Read email from stdin and obtain public key of the sender')
parser.add_argument('-u', '--url',
action='append',
help='Find public key at this URL')
args = parser.parse_args(sys.argv[1:])
addresses = []
if args.address is not None:
for address in args.address:
if '@' in address:
addresses.append(create_variants(address))
urls = []
if args.url is not None:
for url in args.url:
urls.append(url)
keyids = []
if args.key_id is not None:
for keyid in args.key_id:
if keyid.lower().startswith('0x'):
keyid = keyid[2:]
keyids.append(keyid)
if args.mail:
parser = BytesParser()
eml = parser.parse(sys.stdin.buffer)
for key in ['From', 'Return-Path']:
variants = create_variants(eml[key])
addresses.append(variants)
# check for OpenPGP header
if 'OpenPGP' in eml:
data = parse_openpgp_header(eml['OpenPGP'])
if 'url' in data:
urls.append(data['url'])
if 'id' in data:
keyids.append(data['id'])
# check for Autocrypt header
if 'Autocrypt' in eml:
data = parse_autocrypt_header(eml['Autocrypt'])
if 'addr' not in data or data['addr'] not in addresses[-1]:
# addr in Autocrypt must match the From address
# but matching a variant is probably good enough?
pass
elif 'keydata' in data:
if gpg_import(base64.b64decode(bytes(data['keydata'], 'utf-8'))):
# this key is authoritative
sys.exit(0)
# best effort for non-standard headers
for header in ['X-PGP', 'X-PGP-Key', 'X-PGP-KeyID', 'X-PGP-Fingerprint']:
if header not in eml:
continue
value = eml[header].strip()
if value.lower().startswith('0x'):
keyids.append(value[2:])
if '://' in value:
urls.append(value)
if ''.join([ch for ch in value if ch in string.hexdigits]) == value:
keyids.append(value)
for keyid in keyids:
# fetch URLs from keybase
try:
result = urlopen(f'https://keybase.io/_/api/1.0/user/lookup.json?key_fingerprint={keyid}&fields=basics')
except Exception as exc:
continue
if result.status == 200 and result.length > 0:
try:
blob = json.loads(str(result.read(), 'utf-8'))
except json.JSONDecodeError as exc:
continue
if isinstance(blob, dict) and 'them' in blob and len(blob['them']) >= 1:
them = blob['them'][0]
username = them.get('basics', dict()).get('username', None)
if username is not None:
urls.append(f'https://keybase.io/{username}/pgp_keys.asc')
continue
# only fetch from keyserver if keybase did not find anything
result = subprocess.run(['gpg2',
'--search-keys',
'0x'+keyid], stdout=subprocess.PIPE)
for url in urls:
try:
result = urlopen(url)
except Exception as exc:
result = None
if result is not None:
content = result.read()
if result.code == 200 and len(content) > 0:
if gpg_import(content):
continue
# previous fetching somehow failed? try again with gpg natively
result = subprocess.run(['gpg2',
'--fetch-keys',
url], stdout=subprocess.PIPE)
if result.returncode == 0 and len(result.stdout) > 0:
print(str(result.stdout, 'utf-8'))
for address in sorted(addresses):
for variant in address:
result = subprocess.run(['gpg2',
'--auto-key-locate',
'clear,nodefault,wkd,dane,pka',
'--no-batch',
'--locate-keys', variant],
stdout=subprocess.PIPE)
if result.returncode == 0 and len(result.stdout) > 0:
print(str(result.stdout, 'utf-8'))
break
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment