Skip to content
Open
Empty file added api/__init__.py
Empty file.
174 changes: 174 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import List, Optional, Union, Dict

from minichain import Blockchain, Block, State, Transaction
from minichain.merkle import MerkleTree
from minichain.mempool import Mempool
from minichain.mining import mine_and_process_block


blockchain: Optional[Blockchain] = None
mempool: Optional[Mempool] = None
pending_nonce_map: Dict[str, int] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
global blockchain, mempool
blockchain = Blockchain()
mempool = Mempool()
yield
blockchain.save_to_file()


app = FastAPI(title="MiniChain API", description="SPV-enabled blockchain API", lifespan=lifespan)


class TransactionResponse(BaseModel):
sender: str
receiver: Optional[str] = None
amount: int
nonce: int
data: Optional[Union[dict, str]] = None
timestamp: int
signature: Optional[str] = None
hash: Optional[str] = None


class BlockResponse(BaseModel):
index: int
previous_hash: str
merkle_root: Optional[str]
timestamp: int
difficulty: Optional[int]
nonce: int
hash: Optional[str] = None
transactions: List[TransactionResponse]
merkle_proofs: Optional[dict] = None


class VerifyTransactionResponse(BaseModel):
tx_hash: str
block_index: int
merkle_root: str
proof: List[dict]
verification_status: bool
message: str


class ChainInfo(BaseModel):
length: int
blocks: List[dict]


@app.get("/")
def root():
return {"message": "MiniChain API with SPV Support"}


@app.get("/chain", response_model=ChainInfo)
def get_chain():
chain_copy = blockchain.get_chain_copy()

return {
"length": len(chain_copy),
"blocks": [block.to_dict() for block in chain_copy]
}


@app.get("/block/{block_index}", response_model=BlockResponse)
def get_block(block_index: int):
chain_copy = blockchain.get_chain_copy()

if block_index < 0 or block_index >= len(chain_copy):
raise HTTPException(status_code=404, detail="Block not found")

block = chain_copy[block_index]

block_dict = block.to_dict()

merkle_proofs = {}
for i, _ in enumerate(block.transactions):
tx_hash = block.get_tx_hash(i)
if tx_hash:
proof = block.get_merkle_proof(i)
if proof is not None:
merkle_proofs[tx_hash] = proof

return {
**block_dict,
"merkle_proofs": merkle_proofs
}


@app.get("/verify_transaction", response_model=VerifyTransactionResponse)
def verify_transaction(
tx_hash: str = Query(..., description="Transaction hash to verify"),
block_index: int = Query(..., description="Block index to verify against")
):
chain_copy = blockchain.get_chain_copy()

if block_index < 0 or block_index >= len(chain_copy):
raise HTTPException(status_code=404, detail="Block not found")

block = chain_copy[block_index]

tx_found = False
tx_index = -1
for i, _ in enumerate(block.transactions):
tx_hash_computed = block.get_tx_hash(i)
if tx_hash_computed == tx_hash:
tx_found = True
tx_index = i
break

if not tx_found:
return {
"tx_hash": tx_hash,
"block_index": block_index,
"merkle_root": block.merkle_root or "",
"proof": [],
"verification_status": False,
"message": "Transaction not found in block"
}

proof = block.get_merkle_proof(tx_index)
merkle_root = block.merkle_root or ""

if proof is None:
return {
"tx_hash": tx_hash,
"block_index": block_index,
"merkle_root": merkle_root,
"proof": [],
"verification_status": False,
"message": "Failed to generate Merkle proof"
}

verification_status = MerkleTree.verify_proof(tx_hash, proof, merkle_root)

return {
"tx_hash": tx_hash,
"block_index": block_index,
"merkle_root": merkle_root,
"proof": proof,
"verification_status": verification_status,
"message": "Transaction verified successfully" if verification_status else "Verification failed"
}


@app.post("/mine")
def mine_block_endpoint():
block, *_ = mine_and_process_block(blockchain, mempool, pending_nonce_map)

if block:
return {"message": "Block mined successfully", "block": block.to_dict()}
else:
raise HTTPException(status_code=400, detail="Failed to mine block")
Comment on lines +162 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Unnecessary global declaration for pending_nonce_map.

pending_nonce_map is a mutable dict that is only mutated in-place (via pending_nonce_map[address] = ... in sync_nonce), not reassigned. The global declaration is superfluous.

♻️ Proposed fix
 `@app.post`("/mine")
 def mine_block_endpoint():
-    global pending_nonce_map
-    
     block, *_ = mine_and_process_block(blockchain, mempool, pending_nonce_map)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/main.py` around lines 162 - 171, Remove the unnecessary global
declaration in mine_block_endpoint: delete the line declaring global
pending_nonce_map and rely on in-place mutations of the dict (as performed in
sync_nonce) rather than reassigning it; ensure no reassignment to
pending_nonce_map occurs in mine_block_endpoint or mine_and_process_block—if you
need to reassign, change the code to return the new map instead of using global.
Use symbols to locate the change: mine_block_endpoint, pending_nonce_map,
sync_nonce, and mine_and_process_block.



if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
60 changes: 1 addition & 59 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,21 @@
import asyncio
import logging
import re
from nacl.signing import SigningKey
from nacl.encoding import HexEncoder

from minichain import Transaction, Blockchain, Block, State, Mempool, P2PNetwork, mine_block
from minichain.mining import mine_and_process_block, sync_nonce


logger = logging.getLogger(__name__)

BURN_ADDRESS = "0" * 40


def create_wallet():
sk = SigningKey.generate()
pk = sk.verify_key.encode(encoder=HexEncoder).decode()
return sk, pk


def mine_and_process_block(chain, mempool, pending_nonce_map):
"""
Mine block and let Blockchain handle validation + state updates.
DO NOT manually apply transactions again.
"""

pending_txs = mempool.get_transactions_for_block()

block = Block(
index=chain.last_block.index + 1,
previous_hash=chain.last_block.hash,
transactions=pending_txs,
)

mined_block = mine_block(block)

if not hasattr(mined_block, "miner"):
mined_block.miner = BURN_ADDRESS

deployed_contracts: list[str] = []

if chain.add_block(mined_block):
logger.info("Block #%s added", mined_block.index)

miner_attr = getattr(mined_block, "miner", None)
if isinstance(miner_attr, str) and re.match(r'^[0-9a-fA-F]{40}$', miner_attr):
miner_address = miner_attr
else:
logger.warning("Invalid miner address. Crediting burn address.")
miner_address = BURN_ADDRESS

# Reward must go through chain.state
chain.state.credit_mining_reward(miner_address)

for tx in mined_block.transactions:
sync_nonce(chain.state, pending_nonce_map, tx.sender)

# Track deployed contracts if your state.apply_transaction returns address
result = chain.state.get_account(tx.receiver) if tx.receiver else None
if isinstance(result, dict):
deployed_contracts.append(tx.receiver)

return mined_block, deployed_contracts
else:
logger.error("Block rejected by chain")
return None, []


def sync_nonce(state, pending_nonce_map, address):
account = state.get_account(address)
if account and "nonce" in account:
pending_nonce_map[address] = account["nonce"]
else:
pending_nonce_map[address] = 0


async def node_loop():
logger.info("Starting MiniChain Node with Smart Contracts")

Expand Down
47 changes: 16 additions & 31 deletions minichain/block.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,10 @@
import time
import hashlib
import json
import hashlib
from typing import List, Optional
from .transaction import Transaction

def _sha256(data: str) -> str:
return hashlib.sha256(data.encode()).hexdigest()


def _calculate_merkle_root(transactions: List[Transaction]) -> Optional[str]:
if not transactions:
return None

# Hash each transaction deterministically
tx_hashes = [
_sha256(json.dumps(tx.to_dict(), sort_keys=True))
for tx in transactions
]

# Build Merkle tree
while len(tx_hashes) > 1:
if len(tx_hashes) % 2 != 0:
tx_hashes.append(tx_hashes[-1]) # duplicate last if odd

new_level = []
for i in range(0, len(tx_hashes), 2):
combined = tx_hashes[i] + tx_hashes[i + 1]
new_level.append(_sha256(combined))

tx_hashes = new_level

return tx_hashes[0]
from .merkle import MerkleTree
from .utils import _sha256


class Block:
Expand All @@ -57,8 +31,8 @@ def __init__(
self.nonce: int = 0
self.hash: Optional[str] = None

# NEW: compute merkle root once
self.merkle_root: Optional[str] = _calculate_merkle_root(self.transactions)
self._merkle_tree = MerkleTree([tx.to_dict() for tx in self.transactions])
self.merkle_root: Optional[str] = self._merkle_tree.get_merkle_root()

# -------------------------
# HEADER (used for mining)
Expand Down Expand Up @@ -102,3 +76,14 @@ def compute_hash(self) -> str:
sort_keys=True
)
return _sha256(header_string)

# -------------------------
# MERKLE PROOF
# -------------------------
def get_merkle_proof(self, tx_index: int) -> Optional[List[dict]]:
return self._merkle_tree.get_proof(tx_index)

def get_tx_hash(self, tx_index: int) -> Optional[str]:
if tx_index < 0 or tx_index >= len(self._merkle_tree.tx_hashes):
return None
return self._merkle_tree.tx_hashes[tx_index]
Loading