-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemory_server.py
More file actions
1819 lines (1564 loc) · 69.5 KB
/
memory_server.py
File metadata and controls
1819 lines (1564 loc) · 69.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Memory Server — persistent HTTP service for OpenClaw's memory systems.
Keeps the SentenceTransformer model loaded in memory, shared across
Memora and MSA. Agent calls via curl or memory_cli.py instead of
spawning a new python3 process each time.
Zero external dependencies beyond what's already installed.
Usage:
python3 memory_server.py # foreground
python3 memory_server.py --port 18790 # custom port
python3 memory_server.py --daemon # background (writes PID file)
"""
import argparse
import json
import logging
import os
import signal
import sys
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from socketserver import ThreadingMixIn
from typing import Any, Callable, Dict, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [memory-server] %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("memory-server")
_WORKSPACE = Path(__file__).parent
_START_TIME = time.time()
_EMBEDDER_TYPE = "unknown"
def _load_env():
"""Load .env from OpenClaw directory."""
env_path = Path.home() / ".openclaw" / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
def _setup_hf_offline():
"""Configure HuggingFace for offline/mirror mode."""
os.environ.setdefault("HF_HUB_OFFLINE", "1")
os.environ.setdefault("TRANSFORMERS_OFFLINE", "1")
def _force_cpu_for_daemon():
"""MPS (Apple GPU) is unreliable after setsid(); force CPU."""
os.environ["CUDA_VISIBLE_DEVICES"] = ""
os.environ["PYTORCH_MPS_ENABLED"] = "0"
try:
import torch
torch.set_default_device("cpu")
except Exception:
pass
def _load_shared_embedder():
"""Load SentenceTransformer once, set as shared embedder."""
global _EMBEDDER_TYPE
import shared_embedder
try:
from sentence_transformers import SentenceTransformer
import numpy as np
model_name = "nomic-ai/nomic-embed-text-v1.5"
logger.info("Loading SentenceTransformer: %s ...", model_name)
device = "cpu" if os.environ.get("PYTORCH_MPS_ENABLED") == "0" else None
model = SentenceTransformer(model_name, trust_remote_code=True, device=device)
logger.info("Model loaded successfully")
class SharedSentenceEmbedder:
"""Wraps SentenceTransformer with task-prefixed embed interface.
nomic-embed-text-v1.5 requires task prefixes for best retrieval quality:
- "search_document: " for stored content
- "search_query: " for search queries
"""
def __init__(self, m, dim=768):
self._model = m
self.dimension = dim
def embed(self, text: str):
return self._model.encode(f"search_document: {text}", normalize_embeddings=True).tolist()
def embed_document(self, text: str):
return self._model.encode(f"search_document: {text}", normalize_embeddings=True).tolist()
def embed_query(self, text: str):
return self._model.encode(f"search_query: {text}", normalize_embeddings=True).tolist()
def embed_batch(self, texts):
prefixed = [f"search_document: {t}" for t in texts]
return self._model.encode(prefixed, normalize_embeddings=True).astype(np.float32)
def embed_np(self, text: str):
return self._model.encode(f"search_document: {text}", normalize_embeddings=True).astype(np.float32)
emb = SharedSentenceEmbedder(model)
shared_embedder.set(emb)
_EMBEDDER_TYPE = "SentenceTransformer"
logger.info("Shared embedder ready (SentenceTransformer)")
return emb
except Exception as e:
logger.warning("SentenceTransformer load failed (%s), using MockEmbedder", e)
import hashlib
import numpy as np
class SharedMockEmbedder:
def __init__(self, dim=768):
self.dimension = dim
def embed(self, text: str):
digest = hashlib.sha256(text.encode("utf-8")).hexdigest()
raw = [int(digest[i:i+2], 16) / 255.0 for i in range(0, min(len(digest), self.dimension*2), 2)]
while len(raw) < self.dimension:
raw.append(0.5)
vec = np.array(raw[:self.dimension], dtype=np.float32)
norm = np.linalg.norm(vec)
if norm > 0:
vec /= norm
return vec.tolist()
def embed_batch(self, texts):
return np.array([np.array(self.embed(t), dtype=np.float32) for t in texts])
def embed_np(self, text: str):
return np.array(self.embed(text), dtype=np.float32)
emb = SharedMockEmbedder()
shared_embedder.set(emb)
_EMBEDDER_TYPE = "MockEmbedder"
logger.info("Shared embedder ready (MockEmbedder fallback)")
return emb
_hub = None
_hub_hooks_registered = False
def _get_hub():
global _hub, _hub_hooks_registered
if _hub is None:
from memory_hub import hub
_hub = hub
if not _hub_hooks_registered:
_hub.register_post_remember_hook(
lambda content, imp, res: _kg_extract_async(content, imp)
)
_hub.register_post_recall_hook(
lambda merged, query: _track_access_async(merged, query)
)
_hub_hooks_registered = True
return _hub
# ── Telegram Pusher ───────────────────────────────────────────
class TelegramPusher:
"""Sends messages to the user's Telegram via OpenClaw's bot."""
def __init__(self):
self._token: Optional[str] = None
self._chat_id: Optional[str] = None
self._loaded = False
def _load_config(self):
if self._loaded:
return
self._loaded = True
try:
oc_json = Path.home() / ".openclaw" / "openclaw.json"
if oc_json.exists():
data = json.loads(oc_json.read_text(encoding="utf-8"))
tg = data.get("channels", {}).get("telegram", {})
if not tg:
tg = data.get("telegram", {})
self._token = tg.get("botToken")
allow_from = Path.home() / ".openclaw" / "credentials" / "telegram-default-allowFrom.json"
if allow_from.exists():
ids = json.loads(allow_from.read_text(encoding="utf-8"))
chat_ids = ids.get("allowFrom", ids) if isinstance(ids, dict) else ids
if isinstance(chat_ids, list) and chat_ids:
self._chat_id = str(chat_ids[0])
except Exception as e:
logger.warning("TelegramPusher config load failed: %s", e)
def push(self, text: str, parse_mode: str = "Markdown") -> bool:
self._load_config()
if not self._token or not self._chat_id:
logger.debug("Telegram push skipped: no token or chat_id")
return False
try:
from urllib.request import Request, urlopen
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
payload = json.dumps({
"chat_id": self._chat_id,
"text": text[:4000],
"parse_mode": parse_mode,
}).encode("utf-8")
req = Request(url, data=payload, headers={"Content-Type": "application/json"})
with urlopen(req, timeout=15) as resp:
result = json.loads(resp.read())
if result.get("ok"):
logger.info("Telegram push OK (msg_id=%s)", result["result"]["message_id"])
return True
logger.warning("Telegram API error: %s", result)
return False
except Exception as e:
logger.warning("Telegram push failed: %s", e)
return False
_telegram = TelegramPusher()
# ── Auto-Ingestor ─────────────────────────────────────────────
class AutoIngestor:
"""Background thread that watches daily .md files and auto-ingests new content.
Dedup strategy:
- Track processed line count per file (skip already-processed lines)
- Track content hashes of ingested paragraphs (skip exact duplicates)
- VectorStore.add() also deduplicates by prefix match
- Ingest directly into subsystems instead of hub.remember()
to avoid writing back to the daily file (which creates a feedback loop)
"""
_STATE_FILE = "memory/ingestion_state.json"
_HASHES_FILE = "memory/ingestion_hashes.json"
_MSA_STATE_FILE = "memory/msa_ingestion_state.json"
_INTERVAL = 1800 # 30 minutes
_MSA_MIN_CHARS = 200
def __init__(self):
self._state_path = _WORKSPACE / self._STATE_FILE
self._hashes_path = _WORKSPACE / self._HASHES_FILE
self._msa_state_path = _WORKSPACE / self._MSA_STATE_FILE
self._state: Dict[str, int] = {}
self._seen_hashes: set = set()
self._msa_state: Dict[str, int] = {}
self._load_state()
def _load_state(self):
try:
if self._state_path.exists():
self._state = json.loads(self._state_path.read_text(encoding="utf-8"))
except Exception:
self._state = {}
try:
if self._hashes_path.exists():
self._seen_hashes = set(json.loads(self._hashes_path.read_text(encoding="utf-8")))
except Exception:
self._seen_hashes = set()
try:
if self._msa_state_path.exists():
self._msa_state = json.loads(self._msa_state_path.read_text(encoding="utf-8"))
except Exception:
self._msa_state = {}
def _save_state(self):
try:
self._state_path.parent.mkdir(parents=True, exist_ok=True)
self._state_path.write_text(
json.dumps(self._state, ensure_ascii=False, indent=2), encoding="utf-8"
)
self._hashes_path.write_text(
json.dumps(list(self._seen_hashes), ensure_ascii=False), encoding="utf-8"
)
self._msa_state_path.write_text(
json.dumps(self._msa_state, ensure_ascii=False, indent=2), encoding="utf-8"
)
except Exception as e:
logger.warning("AutoIngestor: failed to save state: %s", e)
@staticmethod
def _content_hash(text: str) -> str:
import hashlib
return hashlib.sha256(text.strip().encode("utf-8")).hexdigest()[:24]
def scan_and_ingest(self):
memory_dir = _WORKSPACE / "memory"
if not memory_dir.exists():
return
md_files = sorted(memory_dir.glob("20??-??-??.md"))
total_ingested = 0
for md_file in md_files:
fname = md_file.name
try:
lines = md_file.read_text(encoding="utf-8").splitlines()
except Exception:
continue
processed_up_to = self._state.get(fname, 0)
if len(lines) <= processed_up_to:
continue
new_lines = lines[processed_up_to:]
paragraphs = self._split_paragraphs(new_lines)
for para in paragraphs:
text = para.strip()
if len(text) < 15:
continue
if self._is_test_content(text):
continue
h = self._content_hash(text)
if h in self._seen_hashes:
continue
try:
from memory_security import check_content_safety, SecurityAuditLogger
safety = check_content_safety(text)
if not safety.is_safe:
_audit = SecurityAuditLogger()
_audit.log_write_rejection(
"auto_ingest_blocked", safety.reason,
text[:200], "auto_ingest")
logger.warning("AutoIngestor: blocked unsafe content: %s",
safety.reason)
self._seen_hashes.add(h)
continue
except ImportError:
pass
try:
from memora.vectorstore import vector_store
if vector_store.contains(text):
self._seen_hashes.add(h)
continue
vector_store.add(text, metadata={
"source": "auto_ingest",
"importance": 0.6,
"timestamp": datetime.now().isoformat(),
})
_kg_extract_async(text, 0.6)
self._seen_hashes.add(h)
total_ingested += 1
except Exception as e:
logger.warning("AutoIngestor: ingest failed: %s", e)
self._state[fname] = len(lines)
if total_ingested > 0:
logger.info("AutoIngestor: ingested %d new paragraphs to Memora", total_ingested)
msa_ingested = self._sync_daily_to_msa(md_files)
if msa_ingested > 0:
logger.info("AutoIngestor: synced %d daily files to MSA", msa_ingested)
self._save_state()
def _sync_daily_to_msa(self, md_files: list) -> int:
"""Ingest each daily .md file as a whole MSA document for cross-day deep-recall.
After MSA ingestion, also trigger KG extraction on each chunk so the
knowledge graph can capture cross-day relationships.
"""
synced = 0
for md_file in md_files:
fname = md_file.name
try:
content = md_file.read_text(encoding="utf-8")
except Exception:
continue
file_size = len(content)
if file_size < self._MSA_MIN_CHARS:
continue
prev_size = self._msa_state.get(fname, 0)
if file_size <= prev_size:
continue
doc_id = f"daily-{md_file.stem}"
date_str = md_file.stem
try:
from msa.bridge import bridge as msa_bridge
result = msa_bridge.ingest_and_save(
content,
source="daily_log",
doc_id=doc_id,
metadata={"title": f"Daily Log {date_str}", "source": "daily_log", "date": date_str},
cross_index=False,
write_daily=False,
)
self._msa_state[fname] = file_size
synced += 1
logger.info("AutoIngestor: MSA synced %s (%d chars)", fname, file_size)
self._kg_extract_from_msa_doc(doc_id)
except Exception as e:
logger.warning("AutoIngestor: MSA sync failed for %s: %s", fname, e)
return synced
def _kg_extract_from_msa_doc(self, doc_id: str):
"""Run KG extraction on each chunk of an MSA document (non-blocking)."""
try:
from msa.system import msa_system
chunks, _ = msa_system.memory_bank.load_document_content(doc_id)
if not chunks:
return
for chunk in chunks:
text = chunk.strip()
if len(text) >= 50:
_kg_extract_async(text, 0.65)
logger.info("AutoIngestor: queued KG extraction for %s (%d chunks)", doc_id, len(chunks))
except Exception as e:
logger.warning("AutoIngestor: KG extract from MSA doc %s failed: %s", doc_id, e)
def _split_paragraphs(self, lines: list) -> list:
"""Split lines into paragraphs using ### HH:MM:SS headers as delimiters."""
import re
paragraphs = []
current = []
for line in lines:
if re.match(r'^###\s+\d{2}:\d{2}:\d{2}', line):
if current:
paragraphs.append("\n".join(current))
current = []
else:
if line.strip():
current.append(line)
if current:
paragraphs.append("\n".join(current))
return paragraphs
def _is_test_content(self, text: str) -> bool:
lower = text.lower()
test_markers = [
"test", "测试", "[chronos/test]", "[hub/test]",
"test content", "test query",
]
if len(text) < 30 and any(m in lower for m in test_markers):
return True
return False
def start(self):
def _loop():
while True:
try:
self.scan_and_ingest()
except Exception as e:
logger.error("AutoIngestor error: %s", e, exc_info=True)
time.sleep(self._INTERVAL)
t = threading.Thread(target=_loop, daemon=True, name="auto-ingestor")
t.start()
logger.info("AutoIngestor started (interval=%ds)", self._INTERVAL)
# ── Scheduler ─────────────────────────────────────────────────
class Scheduler:
"""Built-in periodic task scheduler with Telegram push capability."""
_STATE_FILE = "memory/scheduler_state.json"
def __init__(self, pusher: TelegramPusher):
self._pusher = pusher
self._state_path = _WORKSPACE / self._STATE_FILE
self._state: Dict[str, float] = {}
self._load_state()
self._tasks = {
"morning_briefing": {
"interval": 86400,
"hour": 8,
"fn": self._task_morning_briefing,
},
"collision": {
"interval": 21600,
"fn": self._task_collision,
},
"chronos_consolidate": {
"interval": 21600,
"fn": self._task_consolidate,
},
"digest": {
"interval": 86400,
"fn": self._task_digest,
},
"dormant_check": {
"interval": 259200,
"fn": self._task_dormant_check,
},
"kg_contradiction_scan": {
"interval": 86400,
"fn": self._task_contradiction_scan,
},
"blindspot_scan": {
"interval": 604800,
"fn": self._task_blindspot_scan,
},
"skill_proposal": {
"interval": 86400,
"fn": self._task_skill_proposal,
},
}
def _load_state(self):
try:
if self._state_path.exists():
self._state = json.loads(self._state_path.read_text(encoding="utf-8"))
except Exception:
self._state = {}
def _save_state(self):
try:
self._state_path.parent.mkdir(parents=True, exist_ok=True)
self._state_path.write_text(
json.dumps(self._state, ensure_ascii=False, indent=2), encoding="utf-8"
)
except Exception as e:
logger.warning("Scheduler: failed to save state: %s", e)
def _should_run(self, task_name: str, interval: float, hour: int = None) -> bool:
last = self._state.get(task_name, 0)
now = time.time()
if now - last < interval:
return False
if hour is not None:
current_hour = datetime.now().hour
if current_hour < hour:
return False
return True
def tick(self):
for name, cfg in self._tasks.items():
try:
if self._should_run(name, cfg["interval"], cfg.get("hour")):
logger.info("Scheduler: running %s", name)
cfg["fn"]()
self._state[name] = time.time()
self._save_state()
except Exception as e:
logger.error("Scheduler task %s failed: %s", name, e, exc_info=True)
def _task_morning_briefing(self):
try:
from second_brain.bridge import bridge as sb_bridge
data = sb_bridge.daily_briefing()
text = self._humanize_briefing(data)
self._pusher.push(text)
except Exception as e:
logger.error("Morning briefing failed: %s", e)
def _humanize_briefing(self, data: dict) -> str:
"""Generate a warm, human briefing using LLM if available."""
try:
import llm_client
if llm_client.is_available():
raw_text = data.get("text", "")
stats = (
f"记忆总量={data.get('total_memories', 0)}, "
f"灵感={data.get('insight_count', 0)}, "
f"沉睡={data.get('dormant_count', 0)}, "
f"趋势={data.get('trend_count', 0)}, "
f"高活力={data.get('vitality_distribution', {}).get('high', 0)}"
)
personality = self._load_personality_style()
prompt = (
f"你是用户的 AI 记忆伙伴。把以下记忆系统数据转化为一段温暖、自然、简洁的中文问候。"
f"不要列数字,不要用表格,像朋友一样聊天。控制在 200 字以内。"
f"\n\n原始数据:\n{raw_text}\n\n统计: {stats}"
)
if personality:
prompt += f"\n\n用户沟通风格偏好: {personality}"
humanized = llm_client.generate(
prompt=prompt,
system="你是一个有温度的 AI 伙伴,语言简洁温暖,像朋友而非机器人。",
max_tokens=400,
temperature=0.7,
)
if humanized:
today = datetime.now().strftime("%Y-%m-%d")
return f"🧠 {today}\n\n{humanized}"
except Exception as e:
logger.debug("Humanized briefing failed, using raw: %s", e)
return data.get("text", "无法生成简报")
def _load_personality_style(self) -> str:
try:
import yaml
p_path = _WORKSPACE / "PERSONALITY.yaml"
if p_path.exists():
data = yaml.safe_load(p_path.read_text(encoding="utf-8"))
if isinstance(data, dict):
style = data.get("communication_style", data.get("沟通风格", ""))
return str(style)[:200] if style else ""
except Exception:
pass
return ""
def _task_collision(self):
try:
from second_brain.bridge import bridge as sb_bridge
result = sb_bridge.collide()
insights = result.get("insights", [])
high_novelty = [i for i in insights if i.get("novelty", 0) >= 4]
if high_novelty:
text = "💡 *灵感碰撞有新发现:*\n\n"
for ins in high_novelty[:2]:
text += (
f"*[{ins.get('strategy', '?')}]* (新颖度 {ins.get('novelty', 0)})\n"
f"联系: {ins.get('connection', '')[:120]}\n"
f"灵感: {ins.get('ideas', '')[:120]}\n\n"
)
self._pusher.push(text)
except Exception as e:
logger.error("Collision task failed: %s", e)
def _task_consolidate(self):
try:
from chronos.bridge import bridge as chronos_bridge
chronos_bridge.consolidate()
except Exception as e:
logger.error("Consolidate failed: %s", e)
def _task_digest(self):
try:
from second_brain.digest import digest_memories
digest_memories(days=7)
except Exception as e:
logger.error("Digest failed: %s", e)
def _task_dormant_check(self):
try:
from second_brain.bridge import bridge as sb_bridge
result = sb_bridge.list_dormant()
memories = result.get("memories", [])
if memories:
text = f"💤 *{len(memories)} 条记忆已经沉睡了:*\n\n"
for m in memories[:3]:
days = m.get("dormant_days", 0)
content = m.get("content", "")[:80]
text += f"• ({days}天未提及) {content}\n"
text += "\n需要跟进哪个?"
self._pusher.push(text)
except Exception as e:
logger.error("Dormant check failed: %s", e)
def _task_contradiction_scan(self):
try:
from second_brain.inference import inference_engine
reports = inference_engine.scan_contradictions()
critical = [r for r in reports if r.risk_score > 0.5]
if critical:
text = f"⚠️ *发现 {len(critical)} 个高风险矛盾:*\n\n"
for r in critical[:2]:
text += (
f"决策: {r.decision_content[:80]}\n"
f"风险: {r.risk_score:.0%}\n"
f"矛盾证据: {r.contradicting[0].get('content', '')[:80] if r.contradicting else '?'}\n\n"
)
self._pusher.push(text)
except Exception as e:
logger.error("Contradiction scan failed: %s", e)
def _task_blindspot_scan(self):
try:
from second_brain.inference import inference_engine
reports = inference_engine.detect_all_blind_spots()
if reports:
text = f"🔍 *盲区扫描发现 {len(reports)} 个决策有未考虑的维度:*\n\n"
for r in reports[:2]:
missing = r.missing_dimensions[:3] if hasattr(r, "missing_dimensions") else []
text += (
f"决策: {r.decision_content[:80]}\n"
f"遗漏: {', '.join(missing)}\n\n"
)
self._pusher.push(text)
except Exception as e:
logger.error("Blindspot scan failed: %s", e)
def _task_skill_proposal(self):
try:
from second_brain.skill_proposer import proposer
proposals = proposer.scan_and_propose(days=7)
if proposals:
text = f"🎯 *技能提名: {len(proposals)} 个新 draft skill*\n\n"
for p in proposals[:2]:
scores = ", ".join(f"{k}={v}" for k, v in p.sources.items())
text += f"*{p.title}*\n分数: {scores}\n\n"
text += "用 `memory-cli skills` 查看,`memory-cli skill-on <id>` 激活。"
self._pusher.push(text)
except Exception as e:
logger.error("Skill proposal failed: %s", e)
def start(self):
def _loop():
time.sleep(60)
while True:
try:
self.tick()
except Exception as e:
logger.error("Scheduler tick error: %s", e, exc_info=True)
time.sleep(300)
t = threading.Thread(target=_loop, daemon=True, name="scheduler")
t.start()
logger.info("Scheduler started (%d tasks configured)", len(self._tasks))
def _track_access_async(results: list, query: str):
"""Record access events for returned search results (non-blocking)."""
import threading
def _do_track():
try:
from second_brain.bridge import bridge as sb_bridge
for r in results[:5]:
content = r.get("content", "")
sb_bridge.track_access(
memory_id=r.get("metadata", {}).get("source", ""),
content=content,
query=query,
)
except Exception:
pass
threading.Thread(target=_do_track, daemon=True).start()
def _kg_extract_async(content: str, importance: float):
"""Extract knowledge nodes/edges from new memory (non-blocking).
If a contradiction is found during propagation, push to Telegram immediately.
"""
import threading
def _do_extract():
try:
from second_brain.relation_extractor import extractor
result = extractor.extract(content, importance=importance)
if not result.skipped:
logger.info("KG extraction: %d nodes, %d edges, gain=%.2f",
len(result.new_nodes), len(result.new_edges),
result.structural_gain)
from second_brain.skill_proposer import save_kg_score
save_kg_score(result.structural_gain, content[:200])
from second_brain.inference import inference_engine
for node in result.new_nodes:
alerts = inference_engine.propagate(node.id)
for alert in alerts:
logger.info("KG propagation alert: %s", alert.message)
if "contradicts" in alert.message.lower():
_telegram.push(
f"⚡ *实时矛盾检测*\n\n{alert.message[:300]}"
)
except Exception as e:
logger.debug("KG extraction skipped: %s", e)
threading.Thread(target=_do_extract, daemon=True).start()
class TaskManager:
"""In-memory async task queue for long-running operations."""
def __init__(self, max_workers: int = 3):
self._pool = ThreadPoolExecutor(max_workers=max_workers)
self._tasks: Dict[str, dict] = {}
self._lock = threading.Lock()
def submit(self, name: str, fn: Callable, *args, **kwargs) -> str:
task_id = uuid.uuid4().hex[:12]
with self._lock:
self._tasks[task_id] = {
"id": task_id,
"name": name,
"status": "running",
"submitted_at": time.time(),
"result": None,
"error": None,
"completed_at": None,
}
def _run():
try:
result = fn(*args, **kwargs)
with self._lock:
self._tasks[task_id]["status"] = "done"
self._tasks[task_id]["result"] = result
self._tasks[task_id]["completed_at"] = time.time()
elapsed = self._tasks[task_id]["completed_at"] - self._tasks[task_id]["submitted_at"]
logger.info("Task %s (%s) completed in %.1fs", task_id, name, elapsed)
except Exception as e:
with self._lock:
self._tasks[task_id]["status"] = "error"
self._tasks[task_id]["error"] = str(e)
self._tasks[task_id]["completed_at"] = time.time()
logger.error("Task %s (%s) failed: %s", task_id, name, e, exc_info=True)
self._pool.submit(_run)
return task_id
def get(self, task_id: str) -> Optional[dict]:
with self._lock:
task = self._tasks.get(task_id)
if task is None:
return None
info = {
"id": task["id"],
"name": task["name"],
"status": task["status"],
"submitted_at": task["submitted_at"],
"elapsed": round(
(task["completed_at"] or time.time()) - task["submitted_at"], 1
),
}
if task["status"] == "done":
info["result"] = task["result"]
elif task["status"] == "error":
info["error"] = task["error"]
return info
def list_tasks(self) -> list:
with self._lock:
result = []
for task in self._tasks.values():
info = {
"id": task["id"],
"name": task["name"],
"status": task["status"],
"elapsed": round(
(task["completed_at"] or time.time()) - task["submitted_at"], 1
),
}
if task["status"] == "error":
info["error"] = task["error"]
result.append(info)
return result
def cleanup(self, max_age: float = 3600):
now = time.time()
with self._lock:
expired = [
tid for tid, t in self._tasks.items()
if t["status"] in ("done", "error")
and t["completed_at"]
and now - t["completed_at"] > max_age
]
for tid in expired:
del self._tasks[tid]
if expired:
logger.info("Cleaned up %d expired tasks", len(expired))
_task_manager = TaskManager(max_workers=3)
_SLOW_ENDPOINTS = frozenset({
"/second-brain/collide",
"/second-brain/deep-collide",
"/deep-recall",
"/msa/interleave",
"/chronos/consolidate",
"/digest",
"/kg/extract",
})
def _execute_endpoint(path: str, body: dict) -> dict:
"""Core dispatch logic. Stateless — safe to call from any thread."""
hub = _get_hub()
if path == "/remember":
return hub.remember(
content=body.get("content", ""),
source=body.get("source", "openclaw"),
importance=body.get("importance", 0.7),
tag=body.get("tag"),
doc_id=body.get("doc_id"),
title=body.get("title"),
force_systems=body.get("force_systems"),
)
elif path == "/recall":
return hub.recall(
query=body.get("query", ""),
top_k=body.get("top_k", 8),
max_tokens=body.get("max_tokens", 4000),
min_score=body.get("min_score", 0.45),
)
elif path == "/deep-recall":
return hub.deep_recall(
query=body.get("query", ""),
max_rounds=body.get("max_rounds", 3),
)
elif path == "/search":
from memora.vectorstore import vector_store
results = vector_store.search(
query=body.get("query", ""),
limit=body.get("limit", 8),
min_score=body.get("min_score", 0.0),
)
_track_access_async(results, body.get("query", ""))
return {"results": results}
elif path == "/add":
return hub.remember(
content=body.get("content", ""),
source=body.get("source", "cli"),
importance=body.get("importance", 0.7),
force_systems=["memora"],
skip_hooks=body.get("skip_hooks", False),
)
elif path == "/digest":
from second_brain.digest import digest_memories
days = body.get("days", 7)
ok = digest_memories(days=days)
return {"success": ok, "days": days}
elif path == "/chronos/learn":
from chronos.bridge import bridge as chronos_bridge
result = chronos_bridge.learn_and_save(
content=body.get("content", ""),
source=body.get("source", "openclaw"),
importance=body.get("importance", 0.75),
)
return {"importance": result.importance, "timestamp": result.timestamp.isoformat()}
elif path == "/chronos/consolidate":
from chronos.bridge import bridge as chronos_bridge
return chronos_bridge.consolidate()
elif path == "/msa/ingest":
from msa.bridge import bridge as msa_bridge
return msa_bridge.ingest_and_save(
content=body.get("content", ""),
source=body.get("source", "openclaw"),
doc_id=body.get("doc_id"),
metadata=body.get("metadata"),
)
elif path == "/msa/query":
from msa.bridge import bridge as msa_bridge
return msa_bridge.query_memory(
question=body.get("query", ""),
top_k=body.get("top_k"),
)
elif path == "/msa/interleave":
from msa.bridge import bridge as msa_bridge
return msa_bridge.interleave_query(
question=body.get("query", ""),
max_rounds=body.get("max_rounds"),
)
elif path == "/second-brain/collide":
from second_brain.bridge import bridge as sb_bridge
return sb_bridge.collide()
elif path == "/second-brain/deep-collide":
from second_brain.bridge import bridge as sb_bridge
return sb_bridge.deep_collide(topic=body.get("topic", ""))
elif path == "/inspect":
from second_brain.bridge import bridge as sb_bridge
return sb_bridge.memory_lifecycle(query=body.get("query", ""))
elif path == "/second-brain/track":
from second_brain.bridge import bridge as sb_bridge
sb_bridge.track_access(
memory_id=body.get("memory_id", ""),
content=body.get("content", ""),
query=body.get("query", ""),
)
return {"ok": True}
elif path == "/kg/extract":
from second_brain.relation_extractor import extractor
result = extractor.extract(
content=body.get("content", ""),
importance=body.get("importance", 0.5),
source_hash=body.get("source_hash", ""),
)
return {