diff --git a/backend/app/services/graph_builder.py b/backend/app/services/graph_builder.py index 0e0444bf3..56e51bc8b 100644 --- a/backend/app/services/graph_builder.py +++ b/backend/app/services/graph_builder.py @@ -184,20 +184,29 @@ def _build_graph_worker( error_msg = f"{str(e)}\n{traceback.format_exc()}" self.task_manager.fail_task(task_id, error_msg) - def create_graph(self, name: str) -> str: - """创建Zep图谱(公开方法)""" + def create_graph(self, name: str, max_retries: int = 3) -> str: + """创建Zep图谱(公开方法,带重试机制)""" graph_id = f"mirofish_{uuid.uuid4().hex[:16]}" - self.client.graph.create( - graph_id=graph_id, - name=name, - description="MiroFish Social Simulation Graph" - ) - - return graph_id + for attempt in range(max_retries): + try: + self.client.graph.create( + graph_id=graph_id, + name=name, + description="MiroFish Social Simulation Graph" + ) + return graph_id + except Exception as e: + if attempt < max_retries - 1: + wait_time = (attempt + 1) * 2 # 2秒, 4秒, 6秒... + print(f"创建图谱失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}") + print(f"等待 {wait_time} 秒后重试...") + time.sleep(wait_time) + else: + raise - def set_ontology(self, graph_id: str, ontology: Dict[str, Any]): - """设置图谱本体(公开方法)""" + def set_ontology(self, graph_id: str, ontology: Dict[str, Any], max_retries: int = 3): + """设置图谱本体(公开方法,带重试机制)""" import warnings from typing import Optional from pydantic import Field @@ -277,13 +286,24 @@ def safe_attr_name(attr_name: str) -> str: if source_targets: edge_definitions[name] = (edge_class, source_targets) - # 调用Zep API设置本体 + # 调用Zep API设置本体(带重试) if entity_types or edge_definitions: - self.client.graph.set_ontology( - graph_ids=[graph_id], - entities=entity_types if entity_types else None, - edges=edge_definitions if edge_definitions else None, - ) + for attempt in range(max_retries): + try: + self.client.graph.set_ontology( + graph_ids=[graph_id], + entities=entity_types if entity_types else None, + edges=edge_definitions if edge_definitions else None, + ) + break + except Exception as e: + if attempt < max_retries - 1: + wait_time = (attempt + 1) * 2 + print(f"设置本体失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}") + print(f"等待 {wait_time} 秒后重试...") + time.sleep(wait_time) + else: + raise def add_text_batches( self, @@ -314,27 +334,39 @@ def add_text_batches( for chunk in batch_chunks ] - # 发送到Zep - try: - batch_result = self.client.graph.add_batch( - graph_id=graph_id, - episodes=episodes - ) - - # 收集返回的 episode uuid - if batch_result and isinstance(batch_result, list): - for ep in batch_result: - ep_uuid = getattr(ep, 'uuid_', None) or getattr(ep, 'uuid', None) - if ep_uuid: - episode_uuids.append(ep_uuid) - - # 避免请求过快 - time.sleep(1) - - except Exception as e: - if progress_callback: - progress_callback(f"批次 {batch_num} 发送失败: {str(e)}", 0) - raise + # 发送到Zep(带重试) + max_retries = 3 + for attempt in range(max_retries): + try: + batch_result = self.client.graph.add_batch( + graph_id=graph_id, + episodes=episodes + ) + + # 收集返回的 episode uuid + if batch_result and isinstance(batch_result, list): + for ep in batch_result: + ep_uuid = getattr(ep, 'uuid_', None) or getattr(ep, 'uuid', None) + if ep_uuid: + episode_uuids.append(ep_uuid) + + # 避免请求过快 + time.sleep(1) + break + + except Exception as e: + if attempt < max_retries - 1: + wait_time = (attempt + 1) * 2 + if progress_callback: + progress_callback( + f"批次 {batch_num} 发送失败,{wait_time}秒后重试 ({attempt + 1}/{max_retries})...", + (i + len(batch_chunks)) / total_chunks + ) + time.sleep(wait_time) + else: + if progress_callback: + progress_callback(f"批次 {batch_num} 发送失败: {str(e)}", 0) + raise return episode_uuids