add type hints (Python 3.11)
This commit is contained in:
@@ -52,15 +52,16 @@ import sys
|
||||
import urllib.parse as urlparse
|
||||
from enum import Enum
|
||||
from operator import add
|
||||
from typing import TextIO, Any, TypedDict
|
||||
|
||||
from qrcode import QRCode # type: ignore
|
||||
|
||||
import protobuf_generated_python.google_auth_pb2 # type: ignore
|
||||
import protobuf_generated_python.google_auth_pb2 as migration_protobuf
|
||||
|
||||
|
||||
try:
|
||||
import cv2 # type: ignore
|
||||
import numpy # type: ignore
|
||||
import numpy
|
||||
|
||||
try:
|
||||
import pyzbar.pyzbar as zbar # type: ignore
|
||||
@@ -75,8 +76,17 @@ Exception: {e}""")
|
||||
except ImportError:
|
||||
qreader_available = False
|
||||
|
||||
verbose: int
|
||||
quiet: bool
|
||||
# Types
|
||||
Args = argparse.Namespace
|
||||
OtpUrl = str
|
||||
Otp = TypedDict('Otp', {'name': str, 'secret': str, 'issuer': str, 'type': str, 'counter': int | None, 'url': OtpUrl})
|
||||
Otps = list[Otp]
|
||||
OtpUrls = list[OtpUrl]
|
||||
|
||||
|
||||
# Global variable declaration
|
||||
verbose: int = 0
|
||||
quiet: bool = False
|
||||
|
||||
|
||||
def sys_main() -> None:
|
||||
@@ -95,7 +105,7 @@ def main(sys_args: list[str]) -> None:
|
||||
write_json(args, otps)
|
||||
|
||||
|
||||
def parse_args(sys_args: list[str]) -> argparse.Namespace:
|
||||
def parse_args(sys_args: list[str]) -> Args:
|
||||
global verbose, quiet
|
||||
description_text = "Extracts one time password (OTP) secret keys from QR codes, e.g. from Google Authenticator app."
|
||||
if qreader_available:
|
||||
@@ -133,17 +143,17 @@ b) image file containing a QR code or = for stdin for an image containing a QR c
|
||||
return args
|
||||
|
||||
|
||||
def extract_otps(args):
|
||||
def extract_otps(args: Args) -> Otps:
|
||||
if not args.infile:
|
||||
return extract_otps_from_camera(args)
|
||||
else:
|
||||
return extract_otps_from_files(args)
|
||||
|
||||
|
||||
def extract_otps_from_camera(args):
|
||||
def extract_otps_from_camera(args: Args) -> Otps:
|
||||
if verbose: print("Capture QR codes from camera")
|
||||
otp_urls = []
|
||||
otps = []
|
||||
otp_urls: OtpUrls = []
|
||||
otps: Otps = []
|
||||
|
||||
QRMode = Enum('QRMode', ['QREADER', 'DEEP_QREADER', 'CV2'], start=0)
|
||||
qr_mode = QRMode.QREADER
|
||||
@@ -215,7 +225,7 @@ def extract_otps_from_camera(args):
|
||||
return otps
|
||||
|
||||
|
||||
def extract_otps_from_otp_url(otp_url, otp_urls, otps, args):
|
||||
def extract_otps_from_otp_url(otp_url: str, otp_urls: OtpUrls, otps: Otps, args: Args) -> None:
|
||||
if otp_url and verbose: print(otp_url)
|
||||
if otp_url and otp_url not in otp_urls:
|
||||
otp_urls.append(otp_url)
|
||||
@@ -223,8 +233,8 @@ def extract_otps_from_otp_url(otp_url, otp_urls, otps, args):
|
||||
if verbose: print(f"{len(otps)} otp{'s'[:len(otps) != 1]} from {len(otp_urls)} QR code{'s'[:len(otp_urls) != 1]} extracted")
|
||||
|
||||
|
||||
def extract_otps_from_files(args):
|
||||
otps = []
|
||||
def extract_otps_from_files(args: Args) -> Otps:
|
||||
otps: Otps = []
|
||||
|
||||
i = j = k = 0
|
||||
if verbose: print(f"Input files: {args.infile}")
|
||||
@@ -240,7 +250,7 @@ def extract_otps_from_files(args):
|
||||
return otps
|
||||
|
||||
|
||||
def get_otp_urls_from_file(filename):
|
||||
def get_otp_urls_from_file(filename: str) -> OtpUrls:
|
||||
# stdin stream cannot be rewinded, thus distinguish, use - for utf-8 stdin and = for binary image stdin
|
||||
if filename != '=':
|
||||
check_file_exists(filename)
|
||||
@@ -255,7 +265,7 @@ def get_otp_urls_from_file(filename):
|
||||
return []
|
||||
|
||||
|
||||
def read_lines_from_text_file(filename):
|
||||
def read_lines_from_text_file(filename: str) -> list[str]:
|
||||
if verbose: print(f"Reading lines of {filename}")
|
||||
finput = fileinput.input(filename)
|
||||
try:
|
||||
@@ -268,18 +278,18 @@ def read_lines_from_text_file(filename):
|
||||
lines.append(line)
|
||||
if not lines:
|
||||
eprint(f"WARN: {filename.replace('-', 'stdin')} is empty")
|
||||
return lines
|
||||
except UnicodeDecodeError:
|
||||
if filename == '-':
|
||||
abort("\nERROR: Unable to open text file form stdin. "
|
||||
"In case you want read an image file from stdin, you must use '=' instead of '-'.")
|
||||
else: # The file is probably an image, process below
|
||||
return None
|
||||
return []
|
||||
finally:
|
||||
finput.close()
|
||||
return lines
|
||||
|
||||
|
||||
def extract_otp_from_otp_url(otpauth_migration_url, otps, i, j, infile, args):
|
||||
def extract_otp_from_otp_url(otpauth_migration_url: str, otps: Otps, i: int, j: int, infile: str, args: Args) -> int:
|
||||
payload = get_payload_from_otp_url(otpauth_migration_url, i, infile)
|
||||
|
||||
# pylint: disable=no-member
|
||||
@@ -290,7 +300,7 @@ def extract_otp_from_otp_url(otpauth_migration_url, otps, i, j, infile, args):
|
||||
if verbose: print('OTP enum type:', get_enum_name_by_number(raw_otp, 'type'))
|
||||
otp_type = get_otp_type_str_from_code(raw_otp.type)
|
||||
otp_url = build_otp_url(secret, raw_otp)
|
||||
otp = {
|
||||
otp: Otp = {
|
||||
"name": raw_otp.name,
|
||||
"secret": secret,
|
||||
"issuer": raw_otp.issuer,
|
||||
@@ -311,7 +321,7 @@ def extract_otp_from_otp_url(otpauth_migration_url, otps, i, j, infile, args):
|
||||
return j
|
||||
|
||||
|
||||
def convert_img_to_otp_url(filename):
|
||||
def convert_img_to_otp_url(filename: str) -> OtpUrls:
|
||||
if verbose: print(f"Reading image {filename}")
|
||||
try:
|
||||
if filename != '=':
|
||||
@@ -321,7 +331,7 @@ def convert_img_to_otp_url(filename):
|
||||
stdin = sys.stdin.buffer.read()
|
||||
except AttributeError:
|
||||
# Workaround for pytest, since pytest cannot monkeypatch sys.stdin.buffer
|
||||
stdin = sys.stdin.read()
|
||||
stdin = sys.stdin.read() # type: ignore # Workaround for pytest fixtures
|
||||
if not stdin:
|
||||
eprint("WARN: stdin is empty")
|
||||
try:
|
||||
@@ -338,13 +348,12 @@ def convert_img_to_otp_url(filename):
|
||||
decoded_text = QReader().detect_and_decode(img)
|
||||
if decoded_text is None:
|
||||
abort(f"\nERROR: Unable to read QR Code from file.\ninput file: {filename}")
|
||||
|
||||
return [decoded_text]
|
||||
except Exception as e:
|
||||
abort(f"\nERROR: Encountered exception '{e}'.\ninput file: {filename}")
|
||||
return [decoded_text]
|
||||
|
||||
|
||||
def get_payload_from_otp_url(otpauth_migration_url, i, input_source):
|
||||
def get_payload_from_otp_url(otpauth_migration_url: str, i: int, input_source: str) -> migration_protobuf.MigrationPayload:
|
||||
if not otpauth_migration_url.startswith('otpauth-migration://'):
|
||||
eprint(f"\nWARN: line is not a otpauth-migration:// URL\ninput: {input_source}\nline '{otpauth_migration_url}'\nProbably a wrong file was given")
|
||||
parsed_url = urlparse.urlparse(otpauth_migration_url)
|
||||
@@ -352,7 +361,7 @@ def get_payload_from_otp_url(otpauth_migration_url, i, input_source):
|
||||
try:
|
||||
params = urlparse.parse_qs(parsed_url.query, strict_parsing=True)
|
||||
except Exception: # Not necessary for Python >= 3.11
|
||||
params = []
|
||||
params = {}
|
||||
if verbose > 2: print(f"\nDEBUG: querystring params={params}")
|
||||
if 'data' not in params:
|
||||
abort(f"\nERROR: no data query parameter in input URL\ninput file: {input_source}\nline '{otpauth_migration_url}'\nProbably a wrong file was given")
|
||||
@@ -361,7 +370,7 @@ def get_payload_from_otp_url(otpauth_migration_url, i, input_source):
|
||||
data_base64_fixed = data_base64.replace(' ', '+')
|
||||
if verbose > 2: print(f"\nDEBUG: data_base64_fixed={data_base64_fixed}")
|
||||
data = base64.b64decode(data_base64_fixed, validate=True)
|
||||
payload = protobuf_generated_python.google_auth_pb2.MigrationPayload()
|
||||
payload = migration_protobuf.MigrationPayload()
|
||||
try:
|
||||
payload.ParseFromString(data)
|
||||
except Exception:
|
||||
@@ -374,28 +383,28 @@ def get_payload_from_otp_url(otpauth_migration_url, i, input_source):
|
||||
|
||||
|
||||
# https://stackoverflow.com/questions/40226049/find-enums-listed-in-python-descriptor-for-protobuf
|
||||
def get_enum_name_by_number(parent, field_name):
|
||||
def get_enum_name_by_number(parent: Any, field_name: str) -> str:
|
||||
field_value = getattr(parent, field_name)
|
||||
return parent.DESCRIPTOR.fields_by_name[field_name].enum_type.values_by_number.get(field_value).name
|
||||
return parent.DESCRIPTOR.fields_by_name[field_name].enum_type.values_by_number.get(field_value).name # type: ignore # generic code
|
||||
|
||||
|
||||
def get_otp_type_str_from_code(otp_type):
|
||||
def get_otp_type_str_from_code(otp_type: int) -> str:
|
||||
return 'totp' if otp_type == 2 else 'hotp'
|
||||
|
||||
|
||||
def convert_secret_from_bytes_to_base32_str(bytes):
|
||||
def convert_secret_from_bytes_to_base32_str(bytes: bytes) -> str:
|
||||
return str(base64.b32encode(bytes), 'utf-8').replace('=', '')
|
||||
|
||||
|
||||
def build_otp_url(secret, raw_otp):
|
||||
def build_otp_url(secret: str, raw_otp: migration_protobuf.MigrationPayload.OtpParameters) -> str:
|
||||
url_params = {'secret': secret}
|
||||
if raw_otp.type == 1: url_params['counter'] = raw_otp.counter
|
||||
if raw_otp.type == 1: url_params['counter'] = str(raw_otp.counter)
|
||||
if raw_otp.issuer: url_params['issuer'] = raw_otp.issuer
|
||||
otp_url = f"otpauth://{get_otp_type_str_from_code(raw_otp.type)}/{urlparse.quote(raw_otp.name)}?" + urlparse.urlencode(url_params)
|
||||
return otp_url
|
||||
|
||||
|
||||
def print_otp(otp):
|
||||
def print_otp(otp: Otp) -> None:
|
||||
print(f"Name: {otp['name']}")
|
||||
print(f"Secret: {otp['secret']}")
|
||||
if otp['issuer']: print(f"Issuer: {otp['issuer']}")
|
||||
@@ -406,7 +415,7 @@ def print_otp(otp):
|
||||
print(otp['url'])
|
||||
|
||||
|
||||
def save_qr(otp, args, j):
|
||||
def save_qr(otp: Otp, args: Args, j: int) -> str:
|
||||
dir = args.saveqr
|
||||
if not (os.path.exists(dir)): os.makedirs(dir, exist_ok=True)
|
||||
pattern = re.compile(r'[\W_]+')
|
||||
@@ -416,21 +425,21 @@ def save_qr(otp, args, j):
|
||||
return file_otp_issuer
|
||||
|
||||
|
||||
def save_qr_file(args, data, name):
|
||||
def save_qr_file(args: Args, otp_url: OtpUrl, name: str) -> None:
|
||||
qr = QRCode()
|
||||
qr.add_data(data)
|
||||
qr.add_data(otp_url)
|
||||
img = qr.make_image(fill_color='black', back_color='white')
|
||||
if verbose: print(f"Saving to {name}")
|
||||
img.save(name)
|
||||
|
||||
|
||||
def print_qr(args, data):
|
||||
def print_qr(args: Args, otp_url: str) -> None:
|
||||
qr = QRCode()
|
||||
qr.add_data(data)
|
||||
qr.add_data(otp_url)
|
||||
qr.print_ascii()
|
||||
|
||||
|
||||
def write_csv(args, otps):
|
||||
def write_csv(args: Args, otps: Otps) -> None:
|
||||
if args.csv and len(otps) > 0:
|
||||
with open_file_or_stdout_for_csv(args.csv) as outfile:
|
||||
writer = csv.DictWriter(outfile, otps[0].keys())
|
||||
@@ -439,7 +448,7 @@ def write_csv(args, otps):
|
||||
if not quiet: print(f"Exported {len(otps)} otp{'s'[:len(otps) != 1]} to csv {args.csv}")
|
||||
|
||||
|
||||
def write_keepass_csv(args, otps):
|
||||
def write_keepass_csv(args: Args, otps: Otps) -> None:
|
||||
if args.keepass and len(otps) > 0:
|
||||
has_totp = has_otp_type(otps, 'totp')
|
||||
has_hotp = has_otp_type(otps, 'hotp')
|
||||
@@ -479,34 +488,34 @@ def write_keepass_csv(args, otps):
|
||||
if count_hotp_entries > 0: print(f"Exported {count_hotp_entries} hotp entrie{'s'[:count_hotp_entries != 1]} to keepass csv file {otp_filename_hotp}")
|
||||
|
||||
|
||||
def write_json(args, otps):
|
||||
def write_json(args: Args, otps: Otps) -> None:
|
||||
if args.json:
|
||||
with open_file_or_stdout(args.json) as outfile:
|
||||
json.dump(otps, outfile, indent=4)
|
||||
if not quiet: print(f"Exported {len(otps)} otp{'s'[:len(otps) != 1]} to json {args.json}")
|
||||
|
||||
|
||||
def has_otp_type(otps, otp_type):
|
||||
def has_otp_type(otps: Otps, otp_type: str) -> bool:
|
||||
for otp in otps:
|
||||
if otp['type'] == otp_type:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def add_pre_suffix(file, pre_suffix):
|
||||
def add_pre_suffix(file: str, pre_suffix: str) -> str:
|
||||
'''filename.ext, pre -> filename.pre.ext'''
|
||||
name, ext = os.path.splitext(file)
|
||||
return name + "." + pre_suffix + (ext if ext else "")
|
||||
|
||||
|
||||
def open_file_or_stdout(filename):
|
||||
def open_file_or_stdout(filename: str) -> TextIO:
|
||||
'''stdout is denoted as "-".
|
||||
Note: Set before the following line:
|
||||
sys.stdout.close = lambda: None'''
|
||||
return open(filename, "w", encoding='utf-8') if filename != '-' else sys.stdout
|
||||
|
||||
|
||||
def open_file_or_stdout_for_csv(filename):
|
||||
def open_file_or_stdout_for_csv(filename: str) -> TextIO:
|
||||
'''stdout is denoted as "-".
|
||||
newline=''
|
||||
Note: Set before the following line:
|
||||
@@ -514,13 +523,13 @@ def open_file_or_stdout_for_csv(filename):
|
||||
return open(filename, "w", encoding='utf-8', newline='') if filename != '-' else sys.stdout
|
||||
|
||||
|
||||
def check_file_exists(filename):
|
||||
def check_file_exists(filename: str) -> None:
|
||||
if filename != '-' and not os.path.isfile(filename):
|
||||
abort(f"\nERROR: Input file provided is non-existent or not a file."
|
||||
f"\ninput file: {filename}")
|
||||
|
||||
|
||||
def is_binary(line):
|
||||
def is_binary(line: str) -> bool:
|
||||
try:
|
||||
line.startswith('#')
|
||||
return False
|
||||
@@ -528,12 +537,12 @@ def is_binary(line):
|
||||
return True
|
||||
|
||||
|
||||
def eprint(*args, **kwargs):
|
||||
def eprint(*args: Any, **kwargs: Any) -> None:
|
||||
'''Print to stderr.'''
|
||||
print(*args, file=sys.stderr, **kwargs)
|
||||
|
||||
|
||||
def abort(*args, **kwargs):
|
||||
def abort(*args: Any, **kwargs: Any) -> None:
|
||||
eprint(*args, **kwargs)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user