-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbackends.py
More file actions
132 lines (102 loc) · 3.73 KB
/
backends.py
File metadata and controls
132 lines (102 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
__all__ = ['BaseFunctionCacheBackend', 'S3FunctionCacheBackend', 'get_cache_backend']
from functools import lru_cache
from hashlib import sha256
import json
import logging
import re
from django.conf import settings
from django.core.files.base import ContentFile
from django.utils.module_loading import import_string
ARGS_REGEX = re.compile(r'^args\[(\d+)\]$')
logger = logging.getLogger(__name__)
@lru_cache()
def get_cache_backend(name='default', keys=None, **kwargs):
try:
d = settings.FUNCTION_CACHE_BACKENDS[name]
except AttributeError:
if name == 'default':
d = dict(BACKEND='function_cache.backends.S3FunctionCacheBackend')
else:
raise
#end try
backend_class = import_string(d['BACKEND'])
options = d.get('OPTIONS', {})
if keys is None:
keys = options.pop('keys', None)
return backend_class(keys=keys, **options, **kwargs)
#end def
class BaseFunctionCacheBackend():
def __init__(self, keys=None, serializer=None, deserializer=None, **kwargs):
args_keys, kwargs_keys = [], []
if keys:
for k in keys:
m = ARGS_REGEX.match(k)
if m:
args_keys.append(int(m.group(1)))
else:
kwargs_keys.append(k)
#end if
#end for
#end if
self.args_keys = args_keys
self.kwargs_keys = kwargs_keys
self.serializer = serializer or self.default_serializer
self.deserializer = deserializer or self.default_deserializer
#end def
def default_serializer(self, result):
return json.dumps(result, ensure_ascii=True).encode('ascii')
def default_deserializer(self, content):
return json.loads(content)
def compute_key(self, args, kwargs):
if self.args_keys or self.kwargs_keys:
L = [args[i] for i in self.args_keys] + [kwargs.get(kw) for kw in self.kwargs_keys]
else:
L = list(args) + list(kwargs.values())
#end if
# Convert long strings or binary stuff to its sha hash instead
for i, elem in enumerate(L):
if isinstance(elem, str):
N = len(elem)
if N > 1024:
L[i] = sha256(elem.encode('utf-8')).hexdigest()
elif isinstance(elem, (bytes, bytearray)):
N = len(elem)
L[i] = sha256(elem).hexdigest()
#end if
#end for
return sha256(json.dumps(L, ensure_ascii=True).encode('ascii')).hexdigest()
#end def
def exists(self, cache_key):
raise NotImplementedError()
def get(self, cache_key):
raise NotImplementedError()
def put(self, cache_key, result):
raise NotImplementedError()
#end class
class DummyFunctionCacheBackend():
def exists(self, cache_key):
return False
def put(self, cache_key, result):
return
#end class
class S3FunctionCacheBackend(BaseFunctionCacheBackend):
def __init__(self, storage=None, **kwargs):
super().__init__(**kwargs)
self.storage = storage
assert storage is not None
#end def
def exists(self, cache_key):
hit = self.storage.exists(cache_key)
return hit
#end def
def get(self, cache_key):
content = self.storage.open(cache_key).read()
logger.debug(f'S3FunctionCacheBackend read {len(content)} bytes for cache key <{cache_key}>.')
return self.deserializer(content)
#end def
def put(self, cache_key, result):
content = self.serializer(result)
logger.debug(f'S3FunctionCacheBackend saved {len(content)} bytes for cache key <{cache_key}>.')
self.storage.save(cache_key, ContentFile(content))
#end def
#end class