-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils_cache.py
More file actions
189 lines (167 loc) · 7.25 KB
/
utils_cache.py
File metadata and controls
189 lines (167 loc) · 7.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Async and sync helpers for reading a process-local JSON cache (data/cache.json).
Purpose:
- Provide non-blocking accessors to cached price, 24h-change, RSI and MACD values stored in cache.json.
- Maintain an in-memory copy of the cache with simple mtime-based invalidation to reduce disk reads.
- Preserve synchronous accessors for legacy code while encouraging async usage for new code.
Concurrency and guarantees:
- Async loader serializes file reloads with an asyncio.Lock to prevent races.
- On any read error the functions log the exception and return None or an empty dict as appropriate.
- Cache keys use the canonical "SYMBOL_CURRENCY" uppercase form; callers should normalize symbols/currencies.
Example:
price = await get_price_cached_from_file_async("BTC", "USD")
price_sync = get_price_cached_from_file("BTC", "USD") # legacy synchronous usage
"""
import os
import json
import logging
import aiofiles
import asyncio
from config.config import USER_SETTINGS_FILE
CACHE_FILE = "data/cache.json"
logger = logging.getLogger("CoinTrackerBot.Cache")
# --- RAM Cache for cache.json ---
_cache_data = None
_cache_mtime = None
_cache_lock = asyncio.Lock()
async def _load_cache_async():
"""Load cache.json into RAM (async). Refreshes when file mtime changes.
Returns:
dict: Parsed cache contents or {} on error.
"""
global _cache_data, _cache_mtime
async with _cache_lock:
try:
mtime = os.path.getmtime(CACHE_FILE)
if _cache_data is None or _cache_mtime != mtime:
async with aiofiles.open(CACHE_FILE, "r") as f:
content = await f.read()
_cache_data = json.loads(content)
_cache_mtime = mtime
logger.debug(f"[CACHE] cache.json loaded from disk (mtime={mtime})")
else:
logger.debug(f"[CACHE] cache.json served from RAM (mtime={mtime})")
return _cache_data
except Exception as e:
logger.error(f"[CACHE] Error loading cache.json: {e}")
return {}
async def get_price_cached_from_file_async(symbol: str, currency: str = "USD"):
"""Async accessor for cached price.
Returns:
float | None: Cached price or None if not found / on error.
"""
try:
cache = await _load_cache_async()
key = f"{symbol.upper()}_{currency.upper()}"
price = cache.get(key, {}).get("price")
logger.info(f"[CACHE] get_price_cached_from_file_async: {key} -> {price}")
return price
except Exception as e:
logger.error(f"[CACHE] Error reading price from cache: {e}")
return None
async def get_24h_change_cached_from_file_async(symbol: str, currency: str = "USD"):
"""Async accessor for cached 24h percent change.
Returns:
float | None: Cached 24h change or None.
"""
try:
cache = await _load_cache_async()
key = f"{symbol.upper()}_{currency.upper()}"
change = cache.get(key, {}).get("24h_change")
logger.info(f"[CACHE] get_24h_change_cached_from_file_async: {key} -> {change}")
return change
except Exception as e:
logger.error(f"[CACHE] Error reading 24h change from cache: {e}")
return None
async def calculate_rsi_cached_from_file_async(symbol: str, period: int = 14, currency: str = "USD"):
"""Async accessor for cached RSI for a given period.
Returns:
float | None: Cached RSI value or None.
"""
try:
cache = await _load_cache_async()
key = f"{symbol.upper()}_{currency.upper()}"
rsi = cache.get(key, {}).get(f"rsi_{period}")
logger.info(f"[CACHE] calculate_rsi_cached_from_file_async: {key} (period={period}) -> {rsi}")
return rsi
except Exception as e:
logger.error(f"[CACHE] Error reading RSI from cache: {e}")
return None
async def get_macd_cached_from_file_async(symbol: str, currency: str = "USD"):
"""Async accessor for cached MACD payload.
Returns:
dict | None: Cached MACD data or None.
"""
try:
cache = await _load_cache_async()
key = f"{symbol.upper()}_{currency.upper()}"
macd = cache.get(key, {}).get("macd")
logger.info(f"[CACHE] get_macd_cached_from_file_async: {key} -> {macd}")
return macd
except Exception as e:
logger.error(f"[CACHE] Error reading MACD from cache: {e}")
return None
def _load_cache():
"""Synchronous loader for cache.json (legacy). Mirrors async loader behavior synchronously."""
global _cache_data, _cache_mtime
try:
mtime = os.path.getmtime(CACHE_FILE)
if _cache_data is None or _cache_mtime != mtime:
with open(CACHE_FILE, "r") as f:
_cache_data = json.load(f)
_cache_mtime = mtime
logger.debug(f"[CACHE] cache.json loaded from disk (mtime={mtime})")
else:
logger.debug(f"[CACHE] cache.json served from RAM (mtime={mtime})")
return _cache_data
except Exception as e:
logger.error(f"[CACHE] Error loading cache.json: {e}")
return {}
def get_price_cached_from_file(symbol: str, currency: str = "USD"):
"""Synchronous accessor for cached price (legacy)."""
try:
cache = _load_cache()
key = f"{symbol.upper()}_{currency.upper()}"
price = cache.get(key, {}).get("price")
logger.info(f"[CACHE] get_price_cached_from_file: {key} -> {price}")
return price
except Exception as e:
logger.error(f"[CACHE] Error reading price from cache: {e}")
return None
def get_24h_change_cached_from_file(symbol: str, currency: str = "USD"):
"""Synchronous accessor for cached 24h change (legacy)."""
try:
cache = _load_cache()
key = f"{symbol.upper()}_{currency.upper()}"
change = cache.get(key, {}).get("24h_change")
logger.info(f"[CACHE] get_24h_change_cached_from_file: {key} -> {change}")
return change
except Exception as e:
logger.error(f"[CACHE] Error reading 24h change from cache: {e}")
return None
def calculate_rsi_cached_from_file(symbol: str, period: int = 14, currency: str = "USD"):
"""Synchronous accessor for cached RSI (legacy)."""
try:
cache = _load_cache()
key = f"{symbol.upper()}_{currency.upper()}"
rsi = cache.get(key, {}).get(f"rsi_{period}")
logger.info(f"[CACHE] calculate_rsi_cached_from_file: {key} (period={period}) -> {rsi}")
return rsi
except Exception as e:
logger.error(f"[CACHE] Error reading RSI from cache: {e}")
return None
def get_macd_cached_from_file(symbol: str, currency: str = "USD"):
"""Synchronous accessor for cached MACD (legacy)."""
try:
cache = _load_cache()
key = f"{symbol.upper()}_{currency.upper()}"
macd = cache.get(key, {}).get("macd")
logger.info(f"[CACHE] get_macd_cached_from_file: {key} -> {macd}")
return macd
except Exception as e:
logger.error(f"[CACHE] Error reading MACD from cache: {e}")
return None
# Prefer the async helpers for non-blocking access patterns in new code.
# Example:
# price = await get_price_cached_from_file_async("BTC", "USD")
# change = await get_24h_change_cached_from_file_async("BTC", "USD")
# rsi = await calculate_rsi_cached_from_file_async("BTC", 14, "USD")