You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

572 lines
19 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
Tor Contact Info Parser - A tool/Python Class for parsing Tor ContactInfo Information Sharing v2 specification contacts
Written by Eran Sandler (https://twitter.com/erans) (C) 2018
Turned into a proper command-line tool with sub-commands and flags by @Someguy123 at Privex Inc. (C) 2021
(https://www.privex.io) (https://github.com/PrivexInc)
This is a parser for the Tor ContactInfo Information Sharing Specification v2 (https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/).
The parser can parse the ContactInfo field of Tor relays based on the specification.
Official Repo: https://github.com/erans/torcontactinfoparser
Privex Fork: https://github.com/Privex/torcontactinfoparser
Released under the MIT License.
"""
import argparse
import os
import re
import sys
import json
import logging
import warnings
import requests
import textwrap
# from rich import print as rprintxxx
# HAS_RICH = True
if True:
def rprint(value='', *args, **kwargs):
if value not in [None, False, True] and
isinstance(value, (dict, list, set, tuple)):
value = json.dumps(value, indent=4)
return LOG.debug(value, *args, **kwargs)
# rprint = print
# HAS_RICH = False
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
class TorContactInfoParser(object):
email_regex = "^[a-zA-Z0-9.!#$%&*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\\.[a-zA-Z0-9-]+)*$"
def _parse_string_value(self, value, min_length, max_length, valid_chars, raise_exception=False, field_name=None, deobfuscate_email=False):
value_length = len(value)
if value_length < min_length:
if raise_exception:
raise ValueError("value of field '{0}' is too short".format(field_name))
return None
if value_length > max_length:
if raise_exception:
raise ValueError("value of field '{0}' is too long".format(field_name))
return None
if valid_chars != "*":
m = re.search(valid_chars, value)
if not m:
if raise_exception:
raise ValueError("value of field '{0}' doesn't match valid chars restrictions".format(field_name))
else:
return None
return value
def _parse_email_value(self, value, field_name, raise_exception, deobfuscate_email):
if value:
v = value.replace("[]", "@")
if re.search(self.email_regex, v):
if not deobfuscate_email:
return v.replace("@", "[]")
return v
return None
_supported_fields_parsers = {
"email" : {
"fn": _parse_email_value,
"args": {}
},
"url" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 4,
"max_length" : 399,
"valid_chars" : "[_%/:a-zA-Z0-9.-]+"
}
},
"proof" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 7,
"max_length" : 7,
"valid_chars" : "[adinrsu-]+"
}
},
"ciissversion" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[12]+"
}
},
"pgp" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 40,
"max_length" : 40,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"abuse" : {
"fn": _parse_email_value,
"args": {}
},
"keybase" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 50,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"twitter" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 15,
"valid_chars" : "[a-zA-Z0-9_]+"
}
},
"mastodon" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"matrix" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"xmpp" : {
"fn": _parse_email_value,
"args": {}
},
"otr3" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 40,
"max_length" : 40,
"valid_chars" : "[a-z0-9]+"
}
},
"hoster" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "[a-zA-Z0-9.-]+"
}
},
"cost" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 13,
"valid_chars" : "[A-Z0-9.]+"
}
},
"uplinkbw" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 7,
"valid_chars" : "[0-9]+"
}
},
"trafficacct" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 9,
"valid_chars" : "[unmetrd0-9]+"
}
},
"memory" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 10,
"valid_chars" : "[0-9]+"
}
},
"cpu" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 50,
"valid_chars" : "[a-zA-Z0-9_-]+"
}
},
"virtualization" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 15,
"valid_chars" : "[a-z-]+"
}
},
"donationurl" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"btc" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 26,
"max_length" : 99,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"zec" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 95,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"xmr" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 99,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"offlinemasterkey" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"signingkeylifetime" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 6,
"valid_chars" : "[0-9]+"
}
},
"sandbox" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 2,
"valid_chars" : "[yn]"
}
},
"os" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 20,
"valid_chars" : "[A-Za-z0-9/.]+"
}
},
"tls" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 14,
"valid_chars" : "[a-z]+"
}
},
"aesni" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"autoupdate" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"confmgmt" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 15,
"valid_chars" : "[a-zA-Z-]"
}
},
"dnslocation" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 5,
"max_length" : 100,
"valid_chars" : "[a-z,]"
}
},
"dnsqname" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"dnssec" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"dnslocalrootzone" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
}
}
def __init__(self):
pass
def parse(self, value: str, raise_exception_on_invalid_value=False, deobfuscate_email=True) -> dict:
# the ciissversion field is mandatory
if not 'ciissversion:' in value:
return None
result = {}
parts = value.split(" ")
for p in parts:
field_parts = p.split(":", 1)
if len(field_parts) <= 1:
continue
name, data = field_parts
if name in self._supported_fields_parsers:
field_parser = self._supported_fields_parsers[name]
if field_parser is None:
result[name] = data
continue
if callable(field_parser):
value = field_parser(self, data)
else:
field_parser["args"]["field_name"] = name
field_parser["args"]["value"] = data
field_parser["args"]["raise_exception"] = raise_exception_on_invalid_value
field_parser["args"]["deobfuscate_email"] = deobfuscate_email
value = field_parser["fn"](self, **field_parser["args"])
if not result.get(name, None):
result[name] = value
return result
def cmd_parse(opts: argparse.Namespace):
"""
ArgParser function for parsing a single ContactInfo string, and outputting it as JSON (or python-style dict's)
"""
if opts.contact is None or len(opts.contact) == 0 or opts.contact[0] == '-':
contact = sys.stdin.read()
else:
contact = ' '.join(opts.contact).strip()
tparser = TorContactInfoParser()
res = tparser.parse(contact)
if not opts.pretty:
return print(json.dumps(res))
if opts.json:
res = json.dumps(res, indent=4) if opts.pretty else json.dumps(res)
# if not HAS_RICH: res = json.dumps(res, indent=4)
rprint(res)
def cmd_scan(opts: argparse.Namespace, adata=None) -> int:
"""
ArgParser function for scanning all ContactInfo strings from ``https://onionoo.torproject.org/details`` ,
and outputting each one as a Python-style Dict, or JSON.
"""
parser = TorContactInfoParser()
surl = "https://onionoo.torproject.org/details"
if not adata:
LOG.info(f"Getting relays from {surl}")
jdata = requests.get(surl)
try:
adata = jdata.json()
except Exception as e:
# simplejson.errors.JSONDecodeError
LOG.exception(f"JSON error {e}")
return
elts = adata["relays"]
else:
elts = json.loads(adata)['relays']
if not elts:
LOG.warn(f"NO relays - are we connected?")
return
LOG.info(f"{len(elts)} relays")
for relay in elts:
if 'fingerprint' not in relay.keys():
LOG.warn(f"fingerprint not in relay for {relay}")
continue
fp = relay['fingerprint']
verified_host_names = relay.get('verified_host_names', [])
contact = relay.get("contact", None)
if not contact:
LOG.warn(f"No contact for {fp} {verified_host_names}")
continue
if 'ciissversion' not in contact:
LOG.debug(f"No ciissversion in contact in {fp}")
continue
LOG.debug(f"parsing {fp}")
result = parser.parse(contact, False)
if not result:
LOG.warn(f"No result for {contact} in {fp}")
continue
if len(result) > 0:
if opts.json: result = json.dumps(result, indent=4) if opts.pretty else json.dumps(result)
if opts.pretty:
rprint(result)
else:
LOG.debug(result)
return 0
ETC_DIR = '/etc/tor/yaml'
def oparser():
cparser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(f"""
Examples:
# 'scan' is the original behaviour of this script. It iterates over the data
# from https://onionoo.torproject.org/details , parses each contact, and prints it as Python dict-style JSON.
{sys.argv[0]} scan
# Same as previous. With no arguments, it's equivalent to running 'scan'.
{sys.argv[0]}
# If you pass '-p' after scan, it will enable pretty printing. For best pretty printing,
# make sure you have 'rich' installed from pypi.
{sys.argv[0]} scan -p
# If you need real JSON with double quotes, rather than Python dict-style JSON, you can
# use the '-j' flag to enable "real JSON" mode (you can combine with '-p' if you want pretty printed real json)
{sys.argv[0]} scan -j
# Using 'parse', you can parse an arbitrary ContactInfo string, and it will output the parsed result
# with pretty printing by default.
{sys.argv[0]} parse "contact Privex Inc. email:noc[]privex.io url:https://www.privex.io " \\
"proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc hoster:www.privex.io " \\
"uplinkbw:500 memory:4096 virtualization:kvm btc:bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2 " \\
"xmr:89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps"
{{
'email': 'noc@privex.io',
'url': 'https://www.privex.io',
'proof': 'uri-rsa',
'pgp': None,
'keybase': 'privexinc',
'twitter': 'PrivexInc',
'hoster': 'www.privex.io',
'uplinkbw': '500',
'memory': '4096',
'virtualization': 'kvm',
'btc': 'bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2',
'xmr': '89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps'
}}
# You can also pipe a contact string into 'parse', and it will work just the same.
echo "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc" | {sys.argv[0]} parse
{{'email': 'noc@privex.io', 'url': 'https://www.privex.io', 'proof': 'uri-rsa', 'pgp': None, 'keybase': 'privexinc', 'twitter': 'PrivexInc\n'}}
# If you need real JSON outputted, rather than Python dict-style output, you can pass -j to either 'parse' or 'scan'
{sys.argv[0]} parse -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
{{
"email": "noc@privex.io",
"url": "https://www.privex.io",
"proof": "uri-rsa",
"pgp": null,
"keybase": "privexinc",
"twitter": "PrivexInc"
}}
# You can use '-np' to disable pretty printing for 'parse' - you can combine it with '-j' to get flat, plain JSON.
{sys.argv[0]} parse -np -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
{{"email": "noc@privex.io", "url": "https://www.privex.io", "proof": "uri-rsa", "pgp": null, "keybase": "privexinc", "twitter": "PrivexInc"}}
"""))
cparser.add_argument('--relays_output', type=str,
dest='relays_output',
default=os.path.join(ETC_DIR, 'relays.json'),
help="Write the download relays in json to a file")
cparser.add_argument('-j', '--json', action='store_true',
default=False, dest='json',
help="Output real JSON, not Python dict format.")
cparser.set_defaults(func=cmd_scan, json=False, pretty=False)
subparse = cparser.add_subparsers()
subparse.required = False
sp_parse = subparse.add_parser('parse',
help="Parse a single contact string, either as an argument, or piped into stdin")
sp_parse.add_argument('contact', nargs='*')
sp_parse.add_argument('-np', '--no-pretty',
action='store_false', default=False, dest='pretty',
help="Disable pretty printing JSON")
sp_parse.set_defaults(func=cmd_parse)
sp_scan = subparse.add_parser('scan', help="Parse all contacts from https://onionoo.torproject.org/details")
sp_scan.add_argument('-p', action='store_true', default=False, dest='pretty', help="Enable pretty printing JSON")
sp_scan.add_argument('-j', '--json', action='store_true', default=False, dest='json', help="Output real JSON, not Python dict format.")
# sp_scan.set_defaults(func=cmd_scan)
return cparser
def iMain(lArgs=None):
cparser = oparser()
opts = cparser.parse_args(lArgs)
data = None
if opts.relays_output and os.path.exists(opts.relays_output):
data = open(opts.relays_output, 'rt').read()
i = cmd_scan(opts, data)
return i
if __name__ == "__main__":
from exclude_utils import vsetup_logging
if os.environ.get('DEBUG', ''):
log_level = 10 # logging.DEBUG
else:
log_level = 20 # logging.INFO
vsetup_logging(LOG, log_level)
try:
i = iMain(sys.argv[1:])
except KeyboardInterrupt as e:
i = 0
except (requests.exceptions.ProxyError, Exception,) as e:
LOG.exception(f"Exception: {e}", exc_info=True)
i = 0
sys.exit(i)