forked from StorWallet/openapi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopenapi.py
More file actions
162 lines (119 loc) · 4.54 KB
/
openapi.py
File metadata and controls
162 lines (119 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
import json
from typing import List, Optional, Dict
import logzero
from logzero import logger
from fastapi import FastAPI, APIRouter, Request, Body, Depends, HTTPException
from fastapi.responses import JSONResponse
from aiocache import caches, cached
from pydantic import BaseModel
from rolls.rpc.full_node_rpc_client import FullNodeRpcClient
from rolls.util.bech32m import encode_puzzle_hash, decode_puzzle_hash as inner_decode_puzzle_hash
from rolls.types.coin_spend import CoinSpend
from rolls.types.blockchain_format.program import Program
import config as settings
caches.set_config(settings.CACHE_CONFIG)
app = FastAPI()
cwd = os.path.dirname(__file__)
log_dir = os.path.join(cwd, "logs")
if not os.path.exists(log_dir):
os.mkdir(log_dir)
logzero.logfile(os.path.join(log_dir, "api.log"))
async def get_full_node_client() -> FullNodeRpcClient:
config = settings.ROLLS_CONFIG
full_node_client = await FullNodeRpcClient.create(config['self_hostname'], config['full_node']['rpc_port'], settings.ROLLS_ROOT_PATH, settings.ROLLS_CONFIG)
return full_node_client
@app.on_event("startup")
async def startup():
app.state.client = await get_full_node_client()
# check full node connect
await app.state.client.get_blockchain_state()
@app.on_event("shutdown")
async def shutdown():
app.state.client.close()
await app.state.client.await_closed()
def to_hex(data: bytes):
return data.hex()
def decode_puzzle_hash(address):
try:
return inner_decode_puzzle_hash(address)
except ValueError:
raise HTTPException(400, "Invalid Address")
def coin_to_json(coin):
return {
'parent_coin_info': to_hex(coin.parent_coin_info),
'puzzle_hash': to_hex(coin.puzzle_hash),
'amount': str(coin.amount)
}
router = APIRouter()
class UTXO(BaseModel):
parent_coin_info: str
puzzle_hash: str
amount: str
@router.get("/utxos", response_model=List[UTXO])
@cached(ttl=10, key_builder=lambda *args, **kwargs: f"utxos:{kwargs['address']}", alias='default')
async def get_utxos(address: str, request: Request):
# todo: use blocke indexer and supoort unconfirmed param
pzh = decode_puzzle_hash(address)
full_node_client = request.app.state.client
coin_records = await full_node_client.get_coin_records_by_puzzle_hash(puzzle_hash=pzh, include_spent_coins=True)
data = []
for row in coin_records:
if row.spent:
continue
data.append(coin_to_json(row.coin))
return data
@router.post("/sendtx")
async def create_transaction(request: Request, item = Body({})):
spb = CoinSpend.from_json_dict(item['coin_spend'])
full_node_client = request.app.state.client
try:
resp = await full_node_client.push_tx(spb)
except ValueError as e:
logger.warning("sendtx: %s, error: %r", spb, e)
raise HTTPException(400, str(e))
return {
'status': resp['status'],
'id': spb.name().hex()
}
class PecanRollsRpcParams(BaseModel):
method: str
params: Optional[Dict] = None
@router.post('/rolls_rpc')
async def full_node_rpc(request: Request, item: PecanRollsRpcParams):
# todo: limit method and add cache
full_node_client = request.app.state.client
async with full_node_client.session.post(full_node_client.url + item.method, json=item.params, ssl_context=full_node_client.ssl_context) as response:
res_json = await response.json()
return res_json
async def get_user_balance(puzzle_hash: bytes, request: Request):
full_node_client = request.app.state.client
coin_records = await full_node_client.get_coin_records_by_puzzle_hash(puzzle_hash=puzzle_hash, include_spent_coins=True)
amount = sum([c.coin.amount for c in coin_records if c.spent == 0])
return amount
@router.get('/balance')
@cached(ttl=10, key_builder=lambda *args, **kwargs: f"balance:{kwargs['address']}", alias='default')
async def query_balance(address, request: Request):
# todo: use blocke indexer and supoort unconfirmed param
puzzle_hash = decode_puzzle_hash(address)
amount = await get_user_balance(puzzle_hash, request)
data = {
'amount': amount
}
return data
DEFAULT_TOKEN_LIST = [
{
'chain': 'rolls',
'id': 'rolls',
'name': 'ROLLS',
'symbol': 'ROLLS',
'decimals': 12,
'logo_url': 'https://pecanrolls.net/images/rolls-spinning-512.gif',
'is_verified': True,
'is_core': True,
},
]
@router.get('/tokens')
async def list_tokens():
return DEFAULT_TOKEN_LIST
app.include_router(router, prefix="/v1")