Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions bpub.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,9 @@ def decode_stream(payload: bytes):
if "bpub_id" in meta_json:
try:
meta["bpub_id"] = bytes.fromhex(meta_json["bpub_id"])
except Exception:
# if it doesn't parse, leave as-is in hex form
except ValueError:
meta["bpub_id"] = meta_json["bpub_id"]
except Exception:
except (zlib.error, json.JSONDecodeError, UnicodeDecodeError):
meta["meta_blob"] = meta_blob_enc

content_plain = _xor_obfuscate(content_enc)
Expand Down Expand Up @@ -711,13 +710,27 @@ def chunk_data_pubkeys(pubkeys):
yield chunk


def validate_file_path(path: str) -> str:
if os.path.isabs(path) or ".." in path.split(os.path.sep):
raise ValueError(f"Path traversal not allowed: {path}")
return path
Comment thread
kwsantiago marked this conversation as resolved.


def positive_int(value: str) -> int:
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"must be positive: {value}")
return ivalue


def load_psbt_from_arg(psbt_arg: str) -> PSBT:
"""
Helper: load a PSBT either from a file path or from a base64 string.
"""
# If it's a file on disk, read its contents
if os.path.isfile(psbt_arg):
b64 = open(psbt_arg, "r").read().strip()
validate_file_path(psbt_arg)
with open(psbt_arg, "r") as f:
b64 = f.read().strip()
else:
b64 = psbt_arg.strip()
Comment thread
kwsantiago marked this conversation as resolved.

Expand All @@ -733,7 +746,9 @@ def cmd_encode(args):
"""
encode: file -> BPUB stream hex (just a helper)
"""
data = open(args.file, "rb").read()
validate_file_path(args.file)
with open(args.file, "rb") as f:
data = f.read()
if args.legacy_v3:
stream = build_stream_v3_5(data, args.mime, args.filename, args.compress)
elif args.v4:
Expand All @@ -748,7 +763,9 @@ def cmd_decode(args):
decode: BPUB stream hex -> raw file bytes
"""
if os.path.isfile(args.stream_hex):
hexdata = open(args.stream_hex, "r").read().strip()
validate_file_path(args.stream_hex)
with open(args.stream_hex, "r") as f:
hexdata = f.read().strip()
else:
hexdata = args.stream_hex.strip()

Expand Down Expand Up @@ -786,7 +803,9 @@ def cmd_txbuild(args):
if len(control_pubkey) != 33:
sys.exit("control-pubkey must be 33-byte compressed pubkey hex")

data = open(args.file, "rb").read()
validate_file_path(args.file)
with open(args.file, "rb") as f:
data = f.read()

if args.legacy_v3 and args.v4:
sys.exit("Cannot specify both --legacy-v3 and --v4")
Expand Down Expand Up @@ -913,7 +932,9 @@ def cmd_fundpsbt(args):
if len(control_pubkey) != 33:
sys.exit("control-pubkey must be 33-byte compressed pubkey hex")

data = open(args.file, "rb").read()
validate_file_path(args.file)
with open(args.file, "rb") as f:
data = f.read()

if args.legacy_v3 and args.v4:
sys.exit("Cannot specify both --legacy-v3 and --v4")
Expand Down Expand Up @@ -1069,7 +1090,9 @@ def cmd_revealpsbt(args):
if not args.bpub_utxo:
sys.exit("Need at least one --bpub-utxo TXID:VOUT:VALUE")

data = open(args.file, "rb").read()
validate_file_path(args.file)
with open(args.file, "rb") as f:
data = f.read()
if args.legacy_v3:
stream = build_stream_v3_5(data, args.mime, args.filename, args.compress)
version_label = "v3.5 legacy"
Expand Down Expand Up @@ -1375,9 +1398,10 @@ def cmd_txrecover(args):
- After collecting all data pubkeys, we decode them back into a BPUB stream and
then into the original file (v3.5, v4, or v5).
"""

if os.path.isfile(args.rawtx):
rawtx_hex = open(args.rawtx, "r").read().strip()
validate_file_path(args.rawtx)
with open(args.rawtx, "r") as f:
rawtx_hex = f.read().strip()
else:
rawtx_hex = args.rawtx.strip()

Expand Down Expand Up @@ -1543,7 +1567,9 @@ def cmd_ownertransferpsbt(args):
)

if args.file:
data = open(args.file, "rb").read()
validate_file_path(args.file)
with open(args.file, "rb") as f:
data = f.read()
bpub_id = compute_bpub_v5_id(data)
else:
try:
Expand Down Expand Up @@ -1619,9 +1645,10 @@ def cmd_decodetransfer(args):
- owner P2WSH address (the script hash used for the owner UTXO)
- which outputs in this tx are owner UTXOs (same P2WSH scriptPubKey)
"""
# Load raw tx hex (from filename or literal)
if os.path.isfile(args.rawtx):
rawtx_hex = open(args.rawtx, "r").read().strip()
validate_file_path(args.rawtx)
with open(args.rawtx, "r") as f:
rawtx_hex = f.read().strip()
else:
rawtx_hex = args.rawtx.strip()

Expand Down Expand Up @@ -1784,10 +1811,10 @@ def main():
)
tb.add_argument("--utxo", required=True, help="funding UTXO as TXID:VOUT")
tb.add_argument(
"--value", required=True, type=int, help="value of funding UTXO in sats"
"--value", required=True, type=positive_int, help="value of funding UTXO in sats"
)
tb.add_argument(
"--feerate", required=True, type=int, help="target feerate in sats/vbyte"
"--feerate", required=True, type=positive_int, help="target feerate in sats/vbyte"
)
tb.add_argument("--change", help="bech32 change address (optional)")
tb.set_defaults(func=cmd_txbuild)
Expand Down Expand Up @@ -1829,10 +1856,10 @@ def main():
)
fp.add_argument("--utxo", required=True, help="funding UTXO as TXID:VOUT")
fp.add_argument(
"--value", required=True, type=int, help="value of that UTXO in sats"
"--value", required=True, type=positive_int, help="value of that UTXO in sats"
)
fp.add_argument(
"--feerate", required=True, type=int, help="target feerate in sats/vbyte"
"--feerate", required=True, type=positive_int, help="target feerate in sats/vbyte"
)
fp.add_argument("--change", help="bech32 change address (recommended)")
fp.add_argument(
Expand Down Expand Up @@ -1886,7 +1913,7 @@ def main():
"--change", required=True, help="bech32 address to receive swept funds"
)
rp.add_argument(
"--feerate", type=int, default=1, help="target feerate in sats/vbyte"
"--feerate", type=positive_int, default=1, help="target feerate in sats/vbyte"
)
rp.set_defaults(func=cmd_revealpsbt)

Expand Down Expand Up @@ -1957,7 +1984,7 @@ def main():
help="current owner UTXO as TXID:VOUT:VALUE",
)
ot.add_argument(
"--feerate", type=int, default=1, help="target feerate in sats/vbyte"
"--feerate", type=positive_int, default=1, help="target feerate in sats/vbyte"
)
ot.set_defaults(func=cmd_ownertransferpsbt)

Expand Down
12 changes: 8 additions & 4 deletions indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import json
import re
import hashlib
import zlib
from pathlib import Path
from typing import Optional, Tuple, List

Expand Down Expand Up @@ -108,6 +109,7 @@ def save_manifest(manifest, manifest_path: Path):
tmp = manifest_path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(manifest_sorted, f, indent=2)
os.chmod(tmp, 0o600)
tmp.replace(manifest_path)


Expand All @@ -131,6 +133,7 @@ def save_ownership(ownership, ownership_path: Path):
tmp = ownership_path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(ownership, f, indent=2, sort_keys=True)
os.chmod(tmp, 0o600)
tmp.replace(ownership_path)


Expand Down Expand Up @@ -182,7 +185,7 @@ def try_extract_bpub_from_tx(rawtx_hex: str):
"""
try:
tx = CTransaction.deserialize(bytes.fromhex(rawtx_hex))
except Exception:
except (ValueError, TypeError):
return None

wit = getattr(tx, "wit", None)
Expand Down Expand Up @@ -255,7 +258,7 @@ def try_extract_bpub_from_tx(rawtx_hex: str):
try:
raw_stream = decode_pubkeys_to_stream(all_data_pubkeys)
meta, content = decode_stream(raw_stream)
except Exception:
except (ValueError, zlib.error):
return None

if "bpub_version" not in meta:
Expand All @@ -280,7 +283,7 @@ def try_extract_owner_from_tx(rawtx_hex: str):
"""
try:
tx = CTransaction.deserialize(bytes.fromhex(rawtx_hex))
except Exception:
except (ValueError, TypeError):
return None

wit = getattr(tx, "wit", None)
Expand All @@ -301,7 +304,7 @@ def try_extract_owner_from_tx(rawtx_hex: str):
redeem_script = bytes(wstack[-1])
try:
bpub_id_bytes, owner_h160 = decode_owner_redeem_script(redeem_script)
except Exception:
except ValueError:
continue

# We found an owner script; derive addresses and outputs.
Expand Down Expand Up @@ -360,6 +363,7 @@ def save_state(state_path: Path, next_height: int):
tmp = state_path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump({"next_height": next_height}, f)
os.chmod(tmp, 0o600)
tmp.replace(state_path)


Expand Down
1 change: 1 addition & 0 deletions wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def get_or_create_seed(context: str) -> Optional[Tuple[bytes, str, str]]:
f,
indent=2,
)
os.chmod(SEED_FILE, 0o600)
print(f"\nSeed stored in {SEED_FILE} (PLAINTEXT). Keep it secure.\n")
except OSError as e:
print(f"WARNING: Failed to write {SEED_FILE}: {e}")
Expand Down