From aba508e8269cb29ceaec4aef17e48e14c94421ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kyle=20=F0=9F=90=86?= Date: Sat, 20 Dec 2025 19:44:24 -0500 Subject: [PATCH] fix: add path validation, file permissions, input validation, and specific exceptions --- bpub.py | 69 +++++++++++++++++++++++++++++++++++++----------------- indexer.py | 12 ++++++---- wizard.py | 1 + 3 files changed, 57 insertions(+), 25 deletions(-) diff --git a/bpub.py b/bpub.py index ac5e8e7..ba1d910 100644 --- a/bpub.py +++ b/bpub.py @@ -449,10 +449,9 @@ def decode_stream(payload: bytes): if "bpub_id" in meta_json: try: meta["bpub_id"] = bytes.fromhex(meta_json["bpub_id"]) - except Exception: - # if it doesn't parse, leave as-is in hex form + except ValueError: meta["bpub_id"] = meta_json["bpub_id"] - except Exception: + except (zlib.error, json.JSONDecodeError, UnicodeDecodeError): meta["meta_blob"] = meta_blob_enc content_plain = _xor_obfuscate(content_enc) @@ -711,13 +710,27 @@ def chunk_data_pubkeys(pubkeys): yield chunk +def validate_file_path(path: str) -> str: + if os.path.isabs(path) or ".." in path.split(os.path.sep): + raise ValueError(f"Path traversal not allowed: {path}") + return path + + +def positive_int(value: str) -> int: + ivalue = int(value) + if ivalue <= 0: + raise argparse.ArgumentTypeError(f"must be positive: {value}") + return ivalue + + def load_psbt_from_arg(psbt_arg: str) -> PSBT: """ Helper: load a PSBT either from a file path or from a base64 string. """ - # If it's a file on disk, read its contents if os.path.isfile(psbt_arg): - b64 = open(psbt_arg, "r").read().strip() + validate_file_path(psbt_arg) + with open(psbt_arg, "r") as f: + b64 = f.read().strip() else: b64 = psbt_arg.strip() @@ -733,7 +746,9 @@ def cmd_encode(args): """ encode: file -> BPUB stream hex (just a helper) """ - data = open(args.file, "rb").read() + validate_file_path(args.file) + with open(args.file, "rb") as f: + data = f.read() if args.legacy_v3: stream = build_stream_v3_5(data, args.mime, args.filename, args.compress) elif args.v4: @@ -748,7 +763,9 @@ def cmd_decode(args): decode: BPUB stream hex -> raw file bytes """ if os.path.isfile(args.stream_hex): - hexdata = open(args.stream_hex, "r").read().strip() + validate_file_path(args.stream_hex) + with open(args.stream_hex, "r") as f: + hexdata = f.read().strip() else: hexdata = args.stream_hex.strip() @@ -786,7 +803,9 @@ def cmd_txbuild(args): if len(control_pubkey) != 33: sys.exit("control-pubkey must be 33-byte compressed pubkey hex") - data = open(args.file, "rb").read() + validate_file_path(args.file) + with open(args.file, "rb") as f: + data = f.read() if args.legacy_v3 and args.v4: sys.exit("Cannot specify both --legacy-v3 and --v4") @@ -913,7 +932,9 @@ def cmd_fundpsbt(args): if len(control_pubkey) != 33: sys.exit("control-pubkey must be 33-byte compressed pubkey hex") - data = open(args.file, "rb").read() + validate_file_path(args.file) + with open(args.file, "rb") as f: + data = f.read() if args.legacy_v3 and args.v4: sys.exit("Cannot specify both --legacy-v3 and --v4") @@ -1069,7 +1090,9 @@ def cmd_revealpsbt(args): if not args.bpub_utxo: sys.exit("Need at least one --bpub-utxo TXID:VOUT:VALUE") - data = open(args.file, "rb").read() + validate_file_path(args.file) + with open(args.file, "rb") as f: + data = f.read() if args.legacy_v3: stream = build_stream_v3_5(data, args.mime, args.filename, args.compress) version_label = "v3.5 legacy" @@ -1375,9 +1398,10 @@ def cmd_txrecover(args): - After collecting all data pubkeys, we decode them back into a BPUB stream and then into the original file (v3.5, v4, or v5). """ - if os.path.isfile(args.rawtx): - rawtx_hex = open(args.rawtx, "r").read().strip() + validate_file_path(args.rawtx) + with open(args.rawtx, "r") as f: + rawtx_hex = f.read().strip() else: rawtx_hex = args.rawtx.strip() @@ -1543,7 +1567,9 @@ def cmd_ownertransferpsbt(args): ) if args.file: - data = open(args.file, "rb").read() + validate_file_path(args.file) + with open(args.file, "rb") as f: + data = f.read() bpub_id = compute_bpub_v5_id(data) else: try: @@ -1619,9 +1645,10 @@ def cmd_decodetransfer(args): - owner P2WSH address (the script hash used for the owner UTXO) - which outputs in this tx are owner UTXOs (same P2WSH scriptPubKey) """ - # Load raw tx hex (from filename or literal) if os.path.isfile(args.rawtx): - rawtx_hex = open(args.rawtx, "r").read().strip() + validate_file_path(args.rawtx) + with open(args.rawtx, "r") as f: + rawtx_hex = f.read().strip() else: rawtx_hex = args.rawtx.strip() @@ -1784,10 +1811,10 @@ def main(): ) tb.add_argument("--utxo", required=True, help="funding UTXO as TXID:VOUT") tb.add_argument( - "--value", required=True, type=int, help="value of funding UTXO in sats" + "--value", required=True, type=positive_int, help="value of funding UTXO in sats" ) tb.add_argument( - "--feerate", required=True, type=int, help="target feerate in sats/vbyte" + "--feerate", required=True, type=positive_int, help="target feerate in sats/vbyte" ) tb.add_argument("--change", help="bech32 change address (optional)") tb.set_defaults(func=cmd_txbuild) @@ -1829,10 +1856,10 @@ def main(): ) fp.add_argument("--utxo", required=True, help="funding UTXO as TXID:VOUT") fp.add_argument( - "--value", required=True, type=int, help="value of that UTXO in sats" + "--value", required=True, type=positive_int, help="value of that UTXO in sats" ) fp.add_argument( - "--feerate", required=True, type=int, help="target feerate in sats/vbyte" + "--feerate", required=True, type=positive_int, help="target feerate in sats/vbyte" ) fp.add_argument("--change", help="bech32 change address (recommended)") fp.add_argument( @@ -1886,7 +1913,7 @@ def main(): "--change", required=True, help="bech32 address to receive swept funds" ) rp.add_argument( - "--feerate", type=int, default=1, help="target feerate in sats/vbyte" + "--feerate", type=positive_int, default=1, help="target feerate in sats/vbyte" ) rp.set_defaults(func=cmd_revealpsbt) @@ -1957,7 +1984,7 @@ def main(): help="current owner UTXO as TXID:VOUT:VALUE", ) ot.add_argument( - "--feerate", type=int, default=1, help="target feerate in sats/vbyte" + "--feerate", type=positive_int, default=1, help="target feerate in sats/vbyte" ) ot.set_defaults(func=cmd_ownertransferpsbt) diff --git a/indexer.py b/indexer.py index 2bb464f..5d934cf 100644 --- a/indexer.py +++ b/indexer.py @@ -26,6 +26,7 @@ import json import re import hashlib +import zlib from pathlib import Path from typing import Optional, Tuple, List @@ -108,6 +109,7 @@ def save_manifest(manifest, manifest_path: Path): tmp = manifest_path.with_suffix(".tmp") with open(tmp, "w") as f: json.dump(manifest_sorted, f, indent=2) + os.chmod(tmp, 0o600) tmp.replace(manifest_path) @@ -131,6 +133,7 @@ def save_ownership(ownership, ownership_path: Path): tmp = ownership_path.with_suffix(".tmp") with open(tmp, "w") as f: json.dump(ownership, f, indent=2, sort_keys=True) + os.chmod(tmp, 0o600) tmp.replace(ownership_path) @@ -182,7 +185,7 @@ def try_extract_bpub_from_tx(rawtx_hex: str): """ try: tx = CTransaction.deserialize(bytes.fromhex(rawtx_hex)) - except Exception: + except (ValueError, TypeError): return None wit = getattr(tx, "wit", None) @@ -255,7 +258,7 @@ def try_extract_bpub_from_tx(rawtx_hex: str): try: raw_stream = decode_pubkeys_to_stream(all_data_pubkeys) meta, content = decode_stream(raw_stream) - except Exception: + except (ValueError, zlib.error): return None if "bpub_version" not in meta: @@ -280,7 +283,7 @@ def try_extract_owner_from_tx(rawtx_hex: str): """ try: tx = CTransaction.deserialize(bytes.fromhex(rawtx_hex)) - except Exception: + except (ValueError, TypeError): return None wit = getattr(tx, "wit", None) @@ -301,7 +304,7 @@ def try_extract_owner_from_tx(rawtx_hex: str): redeem_script = bytes(wstack[-1]) try: bpub_id_bytes, owner_h160 = decode_owner_redeem_script(redeem_script) - except Exception: + except ValueError: continue # We found an owner script; derive addresses and outputs. @@ -360,6 +363,7 @@ def save_state(state_path: Path, next_height: int): tmp = state_path.with_suffix(".tmp") with open(tmp, "w") as f: json.dump({"next_height": next_height}, f) + os.chmod(tmp, 0o600) tmp.replace(state_path) diff --git a/wizard.py b/wizard.py index e9773b2..70e816f 100644 --- a/wizard.py +++ b/wizard.py @@ -213,6 +213,7 @@ def get_or_create_seed(context: str) -> Optional[Tuple[bytes, str, str]]: f, indent=2, ) + os.chmod(SEED_FILE, 0o600) print(f"\nSeed stored in {SEED_FILE} (PLAINTEXT). Keep it secure.\n") except OSError as e: print(f"WARNING: Failed to write {SEED_FILE}: {e}")