diff --git a/cookbook/example_data_migration.ipynb b/cookbook/example_data_migration.ipynb index 2dc1e7a03..986bf95bf 100644 --- a/cookbook/example_data_migration.ipynb +++ b/cookbook/example_data_migration.ipynb @@ -314,6 +314,7 @@ "import json\n", "from langfuse import Langfuse\n", "# Corrected location for MapValue:\n", + "from langfuse.api import TraceWithFullDetails\n", "from langfuse.api.resources.commons.types import MapValue\n", "# Ingestion types:\n", "from langfuse.api.resources.ingestion.types import (\n", @@ -333,6 +334,149 @@ "from langfuse.api.resources.commons.types import ObservationLevel, ScoreSource, Usage\n", "from langfuse.api.resources.commons.types.score import Score_Numeric, Score_Categorical, Score_Boolean\n", "\n", + "# Helper function to migrate a single trace idempotently\n", + "def migrate_trace_by_id(\n", + " langfuse_source: Langfuse,\n", + " langfuse_destination: Langfuse,\n", + " trace_id: str,\n", + " sleep_between_gets: float,\n", + " sleep_between_batches: float,\n", + " max_retries: int\n", + "):\n", + " \"\"\"Migrate a single trace by ID from source to destination.\n", + "\n", + " Returns a tuple: (ok, failed_fetch, failed_transform, failed_push).\n", + " \"\"\"\n", + " if not trace_id:\n", + " return False, 0, 0, 0\n", + "\n", + " # Optional idempotency check: skip if trace already exists in destination\n", + " try:\n", + " langfuse_destination.api.trace.get(trace_id)\n", + " print(f\" Trace {trace_id} already exists in destination. Skipping migration.\")\n", + " return True, 0, 0, 0\n", + " except Exception as e:\n", + " if \"404\" not in str(e):\n", + " print(f\" Error checking trace {trace_id} in destination: {e}\")\n", + " return False, 0, 0, 1\n", + " # 404 -> proceed to migrate\n", + "\n", + " failed_fetch = 0\n", + " failed_transform = 0\n", + " failed_push = 0\n", + "\n", + " print(f\" Processing source trace ID: {trace_id}\")\n", + " source_trace_full = None\n", + " ingestion_batch = None\n", + "\n", + " # Fetch full trace details with retry\n", + " fetch_detail_success = False\n", + " detail_retries = 0\n", + " while not fetch_detail_success and detail_retries < max_retries:\n", + " current_sleep_get = sleep_between_gets * (2 ** detail_retries)\n", + " time.sleep(current_sleep_get)\n", + " try:\n", + " source_trace_full = langfuse_source.api.trace.get(trace_id)\n", + " fetch_detail_success = True\n", + " except Exception as e:\n", + " detail_retries += 1\n", + " print(f\" Error fetching details for trace {trace_id} (Attempt {detail_retries}/{max_retries}): {e}\")\n", + " if \"429\" in str(e):\n", + " sleep_time = 2 ** detail_retries\n", + " print(f\" Rate limit hit on get(). Sleeping for {sleep_time}s...\")\n", + " time.sleep(sleep_time)\n", + " elif detail_retries >= max_retries:\n", + " print(f\" Max retries reached fetching details for trace {trace_id}.\")\n", + " failed_fetch += 1\n", + " else:\n", + " # Non-429 error\n", + " print(f\" Non-rate-limit error fetching details for trace {trace_id}. Failing this trace.\")\n", + " failed_fetch += 1\n", + " break\n", + "\n", + " if not fetch_detail_success:\n", + " return False, failed_fetch, failed_transform, failed_push\n", + "\n", + " # Transform trace\n", + " try:\n", + " ingestion_batch = transform_trace_to_ingestion_batch(source_trace_full)\n", + " if not ingestion_batch:\n", + " print(f\" Skipping trace {trace_id} due to transformation error (returned empty batch).\")\n", + " failed_transform += 1\n", + " return False, failed_fetch, failed_transform, failed_push\n", + " # For error logging we keep a local map\n", + " current_batch_event_map = {event.id: event for event in ingestion_batch}\n", + " except Exception as e:\n", + " print(f\" Critical Error transforming trace {trace_id}: {e}\")\n", + " failed_transform += 1\n", + " return False, failed_fetch, failed_transform, failed_push\n", + "\n", + " # Push the batch with retry\n", + " push_success = False\n", + " push_retries = 0\n", + " while not push_success and push_retries < max_retries:\n", + " current_sleep_batch = sleep_between_batches * (2 ** push_retries)\n", + " time.sleep(current_sleep_batch)\n", + " try:\n", + " ingestion_response = langfuse_destination.api.ingestion.batch(batch=ingestion_batch)\n", + " push_success = True # Mark as attempted\n", + "\n", + " if ingestion_response.errors:\n", + " print(f\" Ingestion completed with errors for trace {trace_id}:\")\n", + " failed_push += 1 # Count trace as failed if any event fails\n", + " for i, error_detail in enumerate(ingestion_response.errors):\n", + " status = getattr(error_detail, \"status\", \"N/A\")\n", + " message = getattr(error_detail, \"message\", \"No message\")\n", + " failed_event_id = getattr(error_detail, \"id\", None)\n", + " failed_event = current_batch_event_map.get(failed_event_id) if failed_event_id else None\n", + "\n", + " print(f\" Error {i+1}: Status={status}, Message={message}\")\n", + " if failed_event:\n", + " print(f\" Failed Event Type: {getattr(failed_event, 'type', 'Unknown')}\")\n", + " try:\n", + " body_str = json.dumps(\n", + " failed_event.body.dict(),\n", + " indent=2,\n", + " default=str,\n", + " ensure_ascii=False,\n", + " )\n", + " print(\n", + " f\" Failed Event Body (truncated): {body_str[:1000]}\"\n", + " f\"{'...' if len(body_str) > 1000 else ''}\"\n", + " )\n", + " except Exception as dump_err:\n", + " print(f\" Failed Event Body: \")\n", + " else:\n", + " print(\n", + " f\" Failed Event ID: {failed_event_id} \"\n", + " f\"(Could not find matching event in batch)\"\n", + " )\n", + " break # Batch processed (with errors); do not retry\n", + " else:\n", + " print(f\" Successfully ingested trace {trace_id}\")\n", + " return True, failed_fetch, failed_transform, failed_push\n", + "\n", + " except Exception as e:\n", + " push_retries += 1\n", + " print(f\" Error pushing batch for trace {trace_id} (Attempt {push_retries}/{max_retries}): {e}\")\n", + " if \"429\" in str(e):\n", + " sleep_time = 2 ** push_retries\n", + " print(f\" Rate limit hit on batch(). Sleeping for {sleep_time}s...\")\n", + " time.sleep(sleep_time)\n", + " elif push_retries >= max_retries:\n", + " print(f\" Max retries reached pushing batch for trace {trace_id}.\")\n", + " failed_push += 1\n", + " else:\n", + " print(\n", + " f\" Non-rate-limit error pushing batch for trace {trace_id}. \"\n", + " f\"Failing this trace.\"\n", + " )\n", + " failed_push += 1\n", + " break\n", + "\n", + " # If we get here, push did not succeed cleanly\n", + " return False, failed_fetch, failed_transform, failed_push\n", + "\n", "# --- Helper Function for Robust Datetime Formatting ---\n", "def safe_isoformat(dt_obj):\n", " \"\"\"Safely formats datetime object to ISO 8601 string, handling None.\"\"\"\n", @@ -359,7 +503,7 @@ " print(f\"Warning: Could not format datetime {dt_obj}: {e}. Returning None.\")\n", " return None\n", "\n", - "def transform_trace_to_ingestion_batch(source_trace):\n", + "def transform_trace_to_ingestion_batch(source_trace: TraceWithFullDetails):\n", " \"\"\"\n", " Transforms a fetched TraceWithFullDetails object into a list of\n", " IngestionEvent objects suitable for the batch ingestion endpoint.\n", @@ -400,9 +544,6 @@ " # 2. Create Observation Events\n", " sorted_observations = sorted(source_trace.observations, key=lambda o: o.start_time)\n", " for source_obs in sorted_observations:\n", - " new_obs_id = str(uuid.uuid4())\n", - " obs_id_map[source_obs.id] = new_obs_id\n", - " new_parent_observation_id = obs_id_map.get(source_obs.parent_observation_id) if source_obs.parent_observation_id else None\n", " obs_metadata = source_obs.metadata if isinstance(source_obs.metadata, dict) else {}\n", "\n", " model_params_mapped = None\n", @@ -410,16 +551,23 @@ " elif source_obs.model_parameters is not None: print(f\"Warning: Obs {source_obs.id} model_parameters type {type(source_obs.model_parameters)}, skipping.\")\n", "\n", " common_body_args = {\n", - " \"id\": new_obs_id, \"trace_id\": preserved_trace_id, \"name\": source_obs.name,\n", - " \"start_time\": source_obs.start_time, \"metadata\": obs_metadata or None,\n", - " \"input\": source_obs.input, \"output\": source_obs.output, \"level\": source_obs.level,\n", - " \"status_message\": source_obs.status_message, \"parent_observation_id\": new_parent_observation_id,\n", - " \"version\": source_obs.version, \"environment\": source_obs.environment if source_obs.environment else \"default\",\n", + " \"id\": source_obs.id, \n", + " \"trace_id\": preserved_trace_id, \n", + " \"name\": source_obs.name,\n", + " \"start_time\": source_obs.start_time, \n", + " \"metadata\": obs_metadata or None,\n", + " \"input\": source_obs.input, \n", + " \"output\": source_obs.output, \n", + " \"level\": source_obs.level,\n", + " \"status_message\": source_obs.status_message, \n", + " \"parent_observation_id\": source_obs.parent_observation_id,\n", + " \"version\": source_obs.version, \n", + " \"environment\": source_obs.environment if source_obs.environment else \"default\",\n", " }\n", "\n", " event_body = None; ingestion_event_type = None\n", " event_specific_timestamp = safe_isoformat(dt.datetime.now(dt.timezone.utc))\n", - " if not event_specific_timestamp: print(f\"Error: Could not format timestamp for obs {new_obs_id}. Skipping.\"); continue\n", + " if not event_specific_timestamp: print(f\"Error: Could not format timestamp for obs {source_obs.id}. Skipping.\"); continue\n", "\n", " try:\n", " if source_obs.type == \"SPAN\":\n", @@ -446,7 +594,14 @@ " prompt_version=getattr(source_obs, 'prompt_version', None),\n", " )\n", " ingestion_event_type = IngestionEvent_GenerationCreate\n", - " else: print(f\"Warning: Unknown obs type '{source_obs.type}' for ID {source_obs.id}. Skipping.\"); continue\n", + " else:\n", + " print(f\"Warning: Unknown obs type '{source_obs.type}' for ID {source_obs.id}. Mapping as span.\")\n", + " # Map unsupported observation types to SPANs and record the original type in metadata\n", + " if \"original_observation_type\" not in obs_metadata:\n", + " obs_metadata[\"original_observation_type\"] = source_obs.type\n", + " common_body_args[\"metadata\"] = obs_metadata or None\n", + " event_body = CreateSpanBody(**common_body_args, end_time=source_obs.end_time)\n", + " ingestion_event_type = IngestionEvent_SpanCreate\n", "\n", " if event_body and ingestion_event_type:\n", " event_envelope_id = str(uuid.uuid4())\n", @@ -457,8 +612,6 @@ "\n", " # 3. Create Score Events\n", " for source_score in source_trace.scores:\n", - " new_score_id = str(uuid.uuid4())\n", - " new_observation_id = obs_id_map.get(source_score.observation_id) if source_score.observation_id else None\n", " score_metadata = source_score.metadata if isinstance(source_score.metadata, dict) else {}\n", "\n", " score_body_value = None\n", @@ -485,7 +638,7 @@ "\n", " try:\n", " score_body = ScoreBody(\n", - " id=new_score_id,\n", + " id=source_score.id,\n", " trace_id=preserved_trace_id,\n", " name=source_score.name,\n", " # Pass the correctly typed value\n", @@ -494,7 +647,7 @@ " # string_value=string_value if source_score.data_type == \"CATEGORICAL\" else None, # Optional: maybe pass string_value only for categorical?\n", " source=source_score.source,\n", " comment=source_score.comment,\n", - " observation_id=new_observation_id,\n", + " observation_id=source_score.observation_id,\n", " timestamp=source_score.timestamp,\n", " config_id=source_score.config_id,\n", " metadata=score_metadata or None,\n", @@ -502,7 +655,7 @@ " environment=source_score.environment if source_score.environment else \"default\",\n", " )\n", " event_timestamp_str = safe_isoformat(dt.datetime.now(dt.timezone.utc))\n", - " if not event_timestamp_str: print(f\"Error: Could not format timestamp for score {new_score_id}. Skipping.\"); continue\n", + " if not event_timestamp_str: print(f\"Error: Could not format timestamp for score {source_score.id}. Skipping.\"); continue\n", " event_envelope_id = str(uuid.uuid4())\n", " ingestion_events.append(\n", " IngestionEvent_ScoreCreate(id=event_envelope_id, timestamp=event_timestamp_str, body=score_body)\n", @@ -612,104 +765,23 @@ "\n", " print(f\" Fetched {len(trace_list.data)} trace summaries on page {trace_list.meta.page}/{getattr(trace_list.meta, 'total_pages', 'N/A')}.\")\n", "\n", - " # Store event details for better error reporting if batch fails\n", - " current_batch_event_map = {}\n", - "\n", " for trace_info in trace_list.data:\n", " source_trace_id = trace_info.id\n", - " print(f\" Processing source trace ID: {source_trace_id}\")\n", - " source_trace_full = None\n", - " ingestion_batch = None\n", - "\n", - " # Fetch full trace details with retry\n", - " fetch_detail_success = False\n", - " detail_retries = 0\n", - " while not fetch_detail_success and detail_retries < max_retries:\n", - " current_sleep_get = sleep_between_gets * (2 ** detail_retries)\n", - " time.sleep(current_sleep_get)\n", - " try:\n", - " source_trace_full = langfuse_source.api.trace.get(source_trace_id)\n", - " fetch_detail_success = True\n", - " except Exception as e:\n", - " detail_retries += 1\n", - " print(f\" Error fetching details for trace {source_trace_id} (Attempt {detail_retries}/{max_retries}): {e}\")\n", - " if \"429\" in str(e):\n", - " sleep_time = 2 ** detail_retries\n", - " print(f\" Rate limit hit on get(). Sleeping for {sleep_time}s...\")\n", - " time.sleep(sleep_time)\n", - " elif detail_retries >= max_retries:\n", - " print(f\" Max retries reached fetching details for trace {source_trace_id}.\")\n", - " total_failed_fetch += 1\n", - " else: # Non-429 error\n", - " print(f\" Non-rate-limit error fetching details for trace {source_trace_id}. Failing this trace.\")\n", - " total_failed_fetch += 1\n", - " break # Stop retrying this trace's fetch\n", - "\n", - " if not fetch_detail_success: continue # Skip to next trace\n", - "\n", - " # Transform trace\n", - " try:\n", - " ingestion_batch = transform_trace_to_ingestion_batch(source_trace_full)\n", - " if not ingestion_batch:\n", - " print(f\" Skipping trace {source_trace_id} due to transformation error (returned empty batch).\")\n", - " total_failed_transform += 1\n", - " continue\n", - " # Store event details for potential error logging\n", - " current_batch_event_map = {event.id: event for event in ingestion_batch}\n", - " except Exception as e:\n", - " print(f\" Critical Error transforming trace {source_trace_id}: {e}\")\n", - " total_failed_transform += 1\n", - " continue # Skip to next trace\n", - "\n", - " # Push the batch with retry\n", - " push_success = False\n", - " push_retries = 0\n", - " while not push_success and push_retries < max_retries:\n", - " current_sleep_batch = sleep_between_batches * (2 ** push_retries)\n", - " time.sleep(current_sleep_batch)\n", - " try:\n", - " ingestion_response = langfuse_destination.api.ingestion.batch(batch=ingestion_batch)\n", - " push_success = True # Mark as attempted\n", - "\n", - " if ingestion_response.errors:\n", - " print(f\" Ingestion completed with errors for trace {source_trace_id}:\")\n", - " total_failed_push += 1 # Count trace as failed if any event fails\n", - " for i, error_detail in enumerate(ingestion_response.errors):\n", - " status = getattr(error_detail, 'status', 'N/A')\n", - " message = getattr(error_detail, 'message', 'No message')\n", - " failed_event_id = getattr(error_detail, 'id', None) # Use the ID from the error\n", - " failed_event = current_batch_event_map.get(failed_event_id) if failed_event_id else None\n", - "\n", - " print(f\" Error {i+1}: Status={status}, Message={message}\")\n", - " if failed_event:\n", - " print(f\" Failed Event Type: {getattr(failed_event, 'type', 'Unknown')}\")\n", - " try:\n", - " body_str = json.dumps(failed_event.body.dict(), indent=2, default=str, ensure_ascii=False)\n", - " print(f\" Failed Event Body (truncated): {body_str[:1000]}{'...' if len(body_str) > 1000 else ''}\")\n", - " except Exception as dump_err: print(f\" Failed Event Body: \")\n", - " else: print(f\" Failed Event ID: {failed_event_id} (Could not find matching event in batch)\")\n", - " break # Break retry loop even if errors occurred, as batch was processed partially\n", - " else:\n", - " print(f\" Successfully ingested trace {source_trace_id}\")\n", - " total_migrated += 1\n", - "\n", - " except Exception as e:\n", - " push_retries += 1\n", - " print(f\" Error pushing batch for trace {source_trace_id} (Attempt {push_retries}/{max_retries}): {e}\")\n", - " if \"429\" in str(e):\n", - " sleep_time = 2 ** push_retries\n", - " print(f\" Rate limit hit on batch(). Sleeping for {sleep_time}s...\")\n", - " time.sleep(sleep_time)\n", - " elif push_retries >= max_retries:\n", - " print(f\" Max retries reached pushing batch for trace {source_trace_id}.\")\n", - " total_failed_push += 1\n", - " else: # Non-429 error during push attempt\n", - " print(f\" Non-rate-limit error pushing batch for trace {source_trace_id}. Failing this trace.\")\n", - " total_failed_push += 1\n", - " break # Stop retrying push for this trace\n", - "\n", - " # Ensure loop eventually terminates if push fails after retries\n", - " if not push_success and push_retries >= max_retries: continue\n", + "\n", + " ok, failed_fetch, failed_transform, failed_push = migrate_trace_by_id(\n", + " langfuse_source=langfuse_source,\n", + " langfuse_destination=langfuse_destination,\n", + " trace_id=source_trace_id,\n", + " sleep_between_gets=sleep_between_gets,\n", + " sleep_between_batches=sleep_between_batches,\n", + " max_retries=max_retries\n", + " )\n", + "\n", + " if ok:\n", + " total_migrated += 1\n", + " total_failed_fetch += failed_fetch\n", + " total_failed_transform += failed_transform\n", + " total_failed_push += failed_push\n", "\n", "\n", " current_page_meta = getattr(trace_list.meta, 'page', page)\n", @@ -732,7 +804,7 @@ " print(\"Langfuse Traces Migration Script\")\n", " print(\"--------------------------------\")\n", " print(\"WARNING: Migrates full trace data. PRESERVES ORIGINAL TRACE IDS.\")\n", - " print(\"Ensure no ID collisions in the destination project!\")\n", + " print(\"Traces with the same id in the destination will not be replaced\")\n", " print(\"Includes retries with exponential backoff for rate limiting.\")\n", " print(\"--------------------------------\\n\")\n", "\n", @@ -797,9 +869,10 @@ "import os\n", "import sys\n", "import time\n", - "import uuid\n", + "import urllib.parse\n", "import datetime as dt\n", "from langfuse import Langfuse\n", + "from langfuse.api import DatasetRun\n", "from langfuse.api.resources.datasets.types import CreateDatasetRequest\n", "from langfuse.api.resources.dataset_items.types import CreateDatasetItemRequest\n", "from langfuse.api.resources.dataset_run_items.types import CreateDatasetRunItemRequest\n", @@ -823,7 +896,7 @@ " return None\n", "\n", "# --- Main Migration Function ---\n", - "def migrate_datasets(source_config, dest_config, sleep_between_calls=0.4):\n", + "def migrate_datasets(source_config, dest_config, sleep_between_calls, from_ts, to_ts):\n", " \"\"\"\n", " Migrates Datasets, Dataset Items, and Dataset Run Items from source to destination.\n", " ASSUMES Traces/Observations were previously migrated PRESERVING ORIGINAL IDs.\n", @@ -881,7 +954,7 @@ " if \"404\" in str(get_err):\n", " print(f\" Dataset '{source_dataset.name}' not found in destination. Creating...\")\n", " create_ds_req = CreateDatasetRequest(\n", - " name=source_dataset.name, description=source_dataset.description, metadata=source_dataset.metadata\n", + " name=source_dataset.name, description=source_dataset.description, metadata=source_dataset.metadata\n", " )\n", " time.sleep(sleep_between_calls)\n", " created_dataset = langfuse_destination.api.datasets.create(request=create_ds_req)\n", @@ -928,15 +1001,24 @@ " print(f\" Finished processing items for dataset '{source_dataset.name}'. Adding short delay before processing runs.\")\n", " time.sleep(sleep_between_calls * 2)\n", "\n", - " # 3. Migrate Dataset Run Items\n", + " # 3. Migrate Dataset Run Items (and ensure related traces exist in destination)\n", " print(f\" Fetching runs for dataset '{source_dataset.name}'...\")\n", " page_run = 1; limit_run = 100\n", + "\n", + " # Reuse trace migration backoff settings if configured\n", + " sleep_between_gets = float(os.getenv(\"LANGFUSE_MIGRATE_SLEEP_GET\", sleep_between_calls))\n", + " sleep_between_batches = float(os.getenv(\"LANGFUSE_MIGRATE_SLEEP_BATCH\", sleep_between_calls))\n", + " max_retries = int(os.getenv(\"LANGFUSE_MIGRATE_MAX_RETRIES\", 4))\n", + "\n", + " # First, collect all runs for this dataset (respecting the global time window)\n", + " runs_to_recreate : list[DatasetRun] = []\n", " while True:\n", " # print(f\" Fetching page {page_run} of runs metadata...\") # Less verbose logging\n", " time.sleep(sleep_between_calls)\n", " try:\n", + " #get_runs improperly urlencode its path params, so we do it manually, cf https://github.com/fern-api/fern/issues/13065\n", " runs_list = langfuse_source.api.datasets.get_runs(\n", - " dataset_name=source_dataset.name, page=page_run, limit=limit_run\n", + " dataset_name=urllib.parse.quote(source_dataset.name), page=page_run, limit=limit_run\n", " )\n", " if not runs_list.data: break # Done with runs for this dataset\n", " # print(f\" Fetched {len(runs_list.data)} runs metadata.\") # Less verbose logging\n", @@ -944,36 +1026,70 @@ "\n", " for source_run_summary in runs_list.data:\n", " print(f\" Processing run: '{source_run_summary.name}'\")\n", - " try:\n", - " time.sleep(sleep_between_calls)\n", - " source_run_full = langfuse_source.api.datasets.get_run(\n", - " dataset_name=source_dataset.name, run_name=source_run_summary.name\n", - " )\n", - " except Exception as e: print(f\" Error fetching full details for run '{source_run_summary.name}': {e}\"); run_links_failed += 1; continue\n", "\n", - " for source_run_item in source_run_full.dataset_run_items:\n", - " new_dest_item_id = item_id_map.get(source_run_item.dataset_item_id)\n", - " if not new_dest_item_id: print(f\" Warning: Could not find dest mapping for source item ID {source_run_item.dataset_item_id} in run '{source_run_summary.name}'. Skipping link.\"); run_links_failed += 1; continue\n", - " if not source_run_item.trace_id and not source_run_item.observation_id: print(f\" Warning: Source run item for item {source_run_item.dataset_item_id} lacks trace/observation ID. Skipping link.\"); run_links_failed += 1; continue\n", + " # Respect global migration time window for dataset runs, if configured\n", + " if from_ts and source_run_summary.created_at < from_ts:\n", + " print(f\" Skipping run {source_run_summary.name} as it is before the date threshold\") \n", + " continue\n", + " if to_ts and source_run_summary.created_at > to_ts:\n", + " print(f\" Skipping run {source_run_summary.name} as it is after the date threshold\")\n", + " continue\n", + " runs_to_recreate.append(source_run_summary)\n", + "\n", + " if runs_list.meta.page >= getattr(runs_list.meta, 'total_pages', page_run):\n", + " break\n", + "\n", + " page_run += 1\n", + " time.sleep(sleep_between_calls)\n", + "\n", + " # Sort runs by ascending creation time so oldest runs are recreated first and the relative order is conserved\n", + " runs_to_recreate.sort(key=lambda run: run.created_at)\n", "\n", - " run_metadata = source_run_summary.metadata # Pass original run metadata\n", + " print(f\" Recreating '{len(runs_to_recreate)}' runs for dataset '{source_dataset.name}'\")\n", "\n", - " create_run_item_req = CreateDatasetRunItemRequest(\n", - " run_name=source_run_summary.name,\n", - " dataset_item_id=new_dest_item_id,\n", + " for index, source_run_summary in enumerate(runs_to_recreate):\n", + " print(f\" Processing run '{index}'/'{len(runs_to_recreate)}': '{source_run_summary.name}'\")\n", + " try:\n", + " time.sleep(sleep_between_calls)\n", + " # get_run improperly urlencode its path params, so we do it manually, cf https://github.com/fern-api/fern/issues/13065\n", + " source_run_full = langfuse_source.api.datasets.get_run(\n", + " dataset_name=urllib.parse.quote(source_dataset.name), run_name=urllib.parse.quote(source_run_summary.name)\n", + " )\n", + " except Exception as e: print(f\" Error fetching full details for run '{source_run_summary.name}': {e}\"); run_links_failed += 1; continue\n", + "\n", + " for source_run_item in source_run_full.dataset_run_items:\n", + " new_dest_item_id = item_id_map.get(source_run_item.dataset_item_id)\n", + " if not new_dest_item_id: print(f\" Warning: Could not find dest mapping for source item ID {source_run_item.dataset_item_id} in run '{source_run_summary.name}'. Skipping link.\"); run_links_failed += 1; continue\n", + " if not source_run_item.trace_id and not source_run_item.observation_id: print(f\" Warning: Source run item for item {source_run_item.dataset_item_id} lacks trace/observation ID. Skipping link.\"); run_links_failed += 1; continue\n", + "\n", + " # Ensure the related trace is migrated before creating the run link\n", + " if source_run_item.trace_id:\n", + " ok, _, _, _ = migrate_trace_by_id(\n", + " langfuse_source=langfuse_source,\n", + " langfuse_destination=langfuse_destination,\n", " trace_id=source_run_item.trace_id,\n", - " observation_id=source_run_item.observation_id,\n", - " run_description=source_run_summary.description,\n", - " metadata=run_metadata or None # Pass original run metadata here\n", + " sleep_between_gets=sleep_between_gets,\n", + " sleep_between_batches=sleep_between_batches,\n", + " max_retries=max_retries\n", " )\n", - " try:\n", - " time.sleep(sleep_between_calls)\n", - " langfuse_destination.api.dataset_run_items.create(request=create_run_item_req)\n", - " run_links_created += 1\n", - " except Exception as e: print(f\" Error creating run item link for dest item {new_dest_item_id} in run '{source_run_summary.name}': {e}\"); run_links_failed += 1\n", - "\n", - " if runs_list.meta.page >= getattr(runs_list.meta, 'total_pages', page_run): break\n", - " page_run += 1\n", + " if not ok:\n", + " print(f\" Warning: Failed to migrate trace {source_run_item.trace_id} for dataset run '{source_run_summary.name}'. Run link may be incomplete.\")\n", + "\n", + " run_metadata = source_run_summary.metadata # Pass original run metadata\n", + "\n", + " create_run_item_req = CreateDatasetRunItemRequest(\n", + " run_name=source_run_summary.name,\n", + " dataset_item_id=new_dest_item_id,\n", + " trace_id=source_run_item.trace_id,\n", + " observation_id=source_run_item.observation_id,\n", + " run_description=source_run_summary.description,\n", + " metadata=run_metadata or None # Pass original run metadata here\n", + " )\n", + " try:\n", + " time.sleep(sleep_between_calls)\n", + " langfuse_destination.api.dataset_run_items.create(request=create_run_item_req)\n", + " run_links_created += 1\n", + " except Exception as e: print(f\" Error creating run item link for dest item {new_dest_item_id} in run '{source_run_summary.name}': {e}\"); run_links_failed += 1\n", "\n", " if datasets_list.meta.page >= getattr(datasets_list.meta, 'total_pages', page_ds): print(\"Processed the last page of datasets.\"); break\n", " page_ds += 1\n", @@ -1005,6 +1121,18 @@ " if not source_pk or not source_sk: print(\"Error: Source credentials env vars required.\"); sys.exit(1)\n", " if not dest_pk or not dest_sk: print(\"Error: Destination credentials env vars required.\"); sys.exit(1)\n", "\n", + " # Optional time filters for dataset runs (aligned with trace migration)\n", + " from_ts_str = os.getenv(\"LANGFUSE_MIGRATE_FROM_TIMESTAMP\")\n", + " to_ts_str = os.getenv(\"LANGFUSE_MIGRATE_TO_TIMESTAMP\")\n", + " from_ts = parse_datetime(from_ts_str) if from_ts_str else None\n", + " to_ts = parse_datetime(to_ts_str) if to_ts_str else None\n", + " if from_ts_str and not from_ts:\n", + " print(f\"Error: Invalid LANGFUSE_MIGRATE_FROM_TIMESTAMP='{from_ts_str}'. Skipping time filtering for dataset runs.\")\n", + " from_ts = None\n", + " if to_ts_str and not to_ts:\n", + " print(f\"Error: Invalid LANGFUSE_MIGRATE_TO_TIMESTAMP='{to_ts_str}'. Skipping time filtering for dataset runs.\")\n", + " to_ts = None\n", + "\n", " source_credentials = {\"public_key\": source_pk, \"secret_key\": source_sk, \"host\": source_host}\n", " destination_credentials = {\"public_key\": dest_pk, \"secret_key\": dest_sk, \"host\": dest_host}\n", "\n", @@ -1013,7 +1141,7 @@ "\n", " confirmation = input(\"\\nProceed with dataset migration (ASSUMING Trace IDs were preserved)? (yes/no): \").lower()\n", " if confirmation == 'yes':\n", - " migrate_datasets(source_credentials, destination_credentials, sleep_calls)\n", + " migrate_datasets(source_credentials, destination_credentials, sleep_calls, from_ts, to_ts)\n", " else: print(\"Migration cancelled by user.\")\n" ] },