-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_keys.py
More file actions
80 lines (63 loc) · 2.36 KB
/
api_keys.py
File metadata and controls
80 lines (63 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import json
import os
import hashlib
import secrets
from datetime import datetime
from typing import Optional, Dict
API_KEYS_FILE = "api_keys_db.json"
class APIKeyManager:
def __init__(self):
self.db_file = API_KEYS_FILE
self._ensure_db_exists()
def _ensure_db_exists(self):
if not os.path.exists(self.db_file):
with open(self.db_file, 'w') as f:
json.dump({"keys": {}}, f, indent=2)
def _read_db(self) -> Dict:
with open(self.db_file, 'r') as f:
return json.load(f)
def _write_db(self, data: Dict):
with open(self.db_file, 'w') as f:
json.dump(data, f, indent=2)
@staticmethod
def generate_key() -> str:
return f"cvapi_{secrets.token_urlsafe(32)}"
@staticmethod
def hash_key(api_key: str) -> str:
return hashlib.sha256(api_key.encode()).hexdigest()
def create_key(self, user_email: str, credits: int) -> str:
db = self._read_db()
api_key = self.generate_key()
key_hash = self.hash_key(api_key)
db["keys"][key_hash] = {
"user_email": user_email,
"credits_remaining": credits,
"created_at": datetime.utcnow().isoformat(),
"last_used": None,
"is_active": True
}
self._write_db(db)
return api_key
def validate_key(self, api_key: str) -> Optional[Dict]:
db = self._read_db()
key_hash = self.hash_key(api_key)
key_data = db["keys"].get(key_hash)
if not key_data or not key_data["is_active"]:
return None
return key_data
def deduct_credits(self, api_key: str, amount: int) -> bool:
db = self._read_db()
key_hash = self.hash_key(api_key)
key_data = db["keys"].get(key_hash)
if not key_data or not key_data["is_active"]:
return False
if key_data["credits_remaining"] < amount:
return False
key_data["credits_remaining"] -= amount
key_data["last_used"] = datetime.utcnow().isoformat()
self._write_db(db)
return True
def get_credits(self, api_key: str) -> Optional[int]:
key_data = self.validate_key(api_key)
return key_data["credits_remaining"] if key_data else None
api_key_manager = APIKeyManager()