From 3fdb5cf9ff3928a3ce2592349ea3fecc1f243b78 Mon Sep 17 00:00:00 2001 From: ouvreboite Date: Mon, 2 Mar 2026 13:51:31 +0100 Subject: [PATCH 1/5] [Data migration script] Handle tools/agent/... observat --- cookbook/example_data_migration.ipynb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cookbook/example_data_migration.ipynb b/cookbook/example_data_migration.ipynb index 2dc1e7a035..446262b01f 100644 --- a/cookbook/example_data_migration.ipynb +++ b/cookbook/example_data_migration.ipynb @@ -446,7 +446,14 @@ " prompt_version=getattr(source_obs, 'prompt_version', None),\n", " )\n", " ingestion_event_type = IngestionEvent_GenerationCreate\n", - " else: print(f\"Warning: Unknown obs type '{source_obs.type}' for ID {source_obs.id}. Skipping.\"); continue\n", + " else:\n", + " print(f\"Warning: Unknown obs type '{source_obs.type}' for ID {source_obs.id}. Mapping as span.\")\n", + " # Map unsupported observation types to SPANs and record the original type in metadata\n", + " if \"original_observation_type\" not in obs_metadata:\n", + " obs_metadata[\"original_observation_type\"] = source_obs.type\n", + " common_body_args[\"metadata\"] = obs_metadata or None\n", + " event_body = CreateSpanBody(**common_body_args, end_time=source_obs.end_time)\n", + " ingestion_event_type = IngestionEvent_SpanCreate\n", "\n", " if event_body and ingestion_event_type:\n", " event_envelope_id = str(uuid.uuid4())\n", @@ -797,7 +804,6 @@ "import os\n", "import sys\n", "import time\n", - "import uuid\n", "import datetime as dt\n", "from langfuse import Langfuse\n", "from langfuse.api.resources.datasets.types import CreateDatasetRequest\n", From 4d7a3c08c56a03e15e1d7dc5c3f296e423f6bf73 Mon Sep 17 00:00:00 2001 From: ouvreboite Date: Mon, 2 Mar 2026 15:52:56 +0100 Subject: [PATCH 2/5] [Data migration script] Ensure run items' traces are migrated --- cookbook/example_data_migration.ipynb | 286 +++++++++++++++++--------- 1 file changed, 185 insertions(+), 101 deletions(-) diff --git a/cookbook/example_data_migration.ipynb b/cookbook/example_data_migration.ipynb index 446262b01f..530eb26625 100644 --- a/cookbook/example_data_migration.ipynb +++ b/cookbook/example_data_migration.ipynb @@ -333,6 +333,149 @@ "from langfuse.api.resources.commons.types import ObservationLevel, ScoreSource, Usage\n", "from langfuse.api.resources.commons.types.score import Score_Numeric, Score_Categorical, Score_Boolean\n", "\n", + "# Helper function to migrate a single trace idempotently\n", + "def migrate_trace_by_id(\n", + " langfuse_source: Langfuse,\n", + " langfuse_destination: Langfuse,\n", + " trace_id: str,\n", + " sleep_between_gets: float,\n", + " sleep_between_batches: float,\n", + " max_retries: int\n", + "):\n", + " \"\"\"Migrate a single trace by ID from source to destination.\n", + "\n", + " Returns a tuple: (ok, failed_fetch, failed_transform, failed_push).\n", + " \"\"\"\n", + " if not trace_id:\n", + " return False, 0, 0, 0\n", + "\n", + " # Optional idempotency check: skip if trace already exists in destination\n", + " try:\n", + " langfuse_destination.api.trace.get(trace_id)\n", + " print(f\" Trace {trace_id} already exists in destination. Skipping migration.\")\n", + " return True, 0, 0, 0\n", + " except Exception as e:\n", + " if \"404\" not in str(e):\n", + " print(f\" Error checking trace {trace_id} in destination: {e}\")\n", + " return False, 0, 0, 1\n", + " # 404 -> proceed to migrate\n", + "\n", + " failed_fetch = 0\n", + " failed_transform = 0\n", + " failed_push = 0\n", + "\n", + " print(f\" Processing source trace ID: {trace_id}\")\n", + " source_trace_full = None\n", + " ingestion_batch = None\n", + "\n", + " # Fetch full trace details with retry\n", + " fetch_detail_success = False\n", + " detail_retries = 0\n", + " while not fetch_detail_success and detail_retries < max_retries:\n", + " current_sleep_get = sleep_between_gets * (2 ** detail_retries)\n", + " time.sleep(current_sleep_get)\n", + " try:\n", + " source_trace_full = langfuse_source.api.trace.get(trace_id)\n", + " fetch_detail_success = True\n", + " except Exception as e:\n", + " detail_retries += 1\n", + " print(f\" Error fetching details for trace {trace_id} (Attempt {detail_retries}/{max_retries}): {e}\")\n", + " if \"429\" in str(e):\n", + " sleep_time = 2 ** detail_retries\n", + " print(f\" Rate limit hit on get(). Sleeping for {sleep_time}s...\")\n", + " time.sleep(sleep_time)\n", + " elif detail_retries >= max_retries:\n", + " print(f\" Max retries reached fetching details for trace {trace_id}.\")\n", + " failed_fetch += 1\n", + " else:\n", + " # Non-429 error\n", + " print(f\" Non-rate-limit error fetching details for trace {trace_id}. Failing this trace.\")\n", + " failed_fetch += 1\n", + " break\n", + "\n", + " if not fetch_detail_success:\n", + " return False, failed_fetch, failed_transform, failed_push\n", + "\n", + " # Transform trace\n", + " try:\n", + " ingestion_batch = transform_trace_to_ingestion_batch(source_trace_full)\n", + " if not ingestion_batch:\n", + " print(f\" Skipping trace {trace_id} due to transformation error (returned empty batch).\")\n", + " failed_transform += 1\n", + " return False, failed_fetch, failed_transform, failed_push\n", + " # For error logging we keep a local map\n", + " current_batch_event_map = {event.id: event for event in ingestion_batch}\n", + " except Exception as e:\n", + " print(f\" Critical Error transforming trace {trace_id}: {e}\")\n", + " failed_transform += 1\n", + " return False, failed_fetch, failed_transform, failed_push\n", + "\n", + " # Push the batch with retry\n", + " push_success = False\n", + " push_retries = 0\n", + " while not push_success and push_retries < max_retries:\n", + " current_sleep_batch = sleep_between_batches * (2 ** push_retries)\n", + " time.sleep(current_sleep_batch)\n", + " try:\n", + " ingestion_response = langfuse_destination.api.ingestion.batch(batch=ingestion_batch)\n", + " push_success = True # Mark as attempted\n", + "\n", + " if ingestion_response.errors:\n", + " print(f\" Ingestion completed with errors for trace {trace_id}:\")\n", + " failed_push += 1 # Count trace as failed if any event fails\n", + " for i, error_detail in enumerate(ingestion_response.errors):\n", + " status = getattr(error_detail, \"status\", \"N/A\")\n", + " message = getattr(error_detail, \"message\", \"No message\")\n", + " failed_event_id = getattr(error_detail, \"id\", None)\n", + " failed_event = current_batch_event_map.get(failed_event_id) if failed_event_id else None\n", + "\n", + " print(f\" Error {i+1}: Status={status}, Message={message}\")\n", + " if failed_event:\n", + " print(f\" Failed Event Type: {getattr(failed_event, 'type', 'Unknown')}\")\n", + " try:\n", + " body_str = json.dumps(\n", + " failed_event.body.dict(),\n", + " indent=2,\n", + " default=str,\n", + " ensure_ascii=False,\n", + " )\n", + " print(\n", + " f\" Failed Event Body (truncated): {body_str[:1000]}\"\n", + " f\"{'...' if len(body_str) > 1000 else ''}\"\n", + " )\n", + " except Exception as dump_err:\n", + " print(f\" Failed Event Body: \")\n", + " else:\n", + " print(\n", + " f\" Failed Event ID: {failed_event_id} \"\n", + " f\"(Could not find matching event in batch)\"\n", + " )\n", + " break # Batch processed (with errors); do not retry\n", + " else:\n", + " print(f\" Successfully ingested trace {trace_id}\")\n", + " return True, failed_fetch, failed_transform, failed_push\n", + "\n", + " except Exception as e:\n", + " push_retries += 1\n", + " print(f\" Error pushing batch for trace {trace_id} (Attempt {push_retries}/{max_retries}): {e}\")\n", + " if \"429\" in str(e):\n", + " sleep_time = 2 ** push_retries\n", + " print(f\" Rate limit hit on batch(). Sleeping for {sleep_time}s...\")\n", + " time.sleep(sleep_time)\n", + " elif push_retries >= max_retries:\n", + " print(f\" Max retries reached pushing batch for trace {trace_id}.\")\n", + " failed_push += 1\n", + " else:\n", + " print(\n", + " f\" Non-rate-limit error pushing batch for trace {trace_id}. \"\n", + " f\"Failing this trace.\"\n", + " )\n", + " failed_push += 1\n", + " break\n", + "\n", + " # If we get here, push did not succeed cleanly\n", + " return False, failed_fetch, failed_transform, failed_push\n", + "\n", "# --- Helper Function for Robust Datetime Formatting ---\n", "def safe_isoformat(dt_obj):\n", " \"\"\"Safely formats datetime object to ISO 8601 string, handling None.\"\"\"\n", @@ -619,104 +762,23 @@ "\n", " print(f\" Fetched {len(trace_list.data)} trace summaries on page {trace_list.meta.page}/{getattr(trace_list.meta, 'total_pages', 'N/A')}.\")\n", "\n", - " # Store event details for better error reporting if batch fails\n", - " current_batch_event_map = {}\n", - "\n", " for trace_info in trace_list.data:\n", " source_trace_id = trace_info.id\n", - " print(f\" Processing source trace ID: {source_trace_id}\")\n", - " source_trace_full = None\n", - " ingestion_batch = None\n", - "\n", - " # Fetch full trace details with retry\n", - " fetch_detail_success = False\n", - " detail_retries = 0\n", - " while not fetch_detail_success and detail_retries < max_retries:\n", - " current_sleep_get = sleep_between_gets * (2 ** detail_retries)\n", - " time.sleep(current_sleep_get)\n", - " try:\n", - " source_trace_full = langfuse_source.api.trace.get(source_trace_id)\n", - " fetch_detail_success = True\n", - " except Exception as e:\n", - " detail_retries += 1\n", - " print(f\" Error fetching details for trace {source_trace_id} (Attempt {detail_retries}/{max_retries}): {e}\")\n", - " if \"429\" in str(e):\n", - " sleep_time = 2 ** detail_retries\n", - " print(f\" Rate limit hit on get(). Sleeping for {sleep_time}s...\")\n", - " time.sleep(sleep_time)\n", - " elif detail_retries >= max_retries:\n", - " print(f\" Max retries reached fetching details for trace {source_trace_id}.\")\n", - " total_failed_fetch += 1\n", - " else: # Non-429 error\n", - " print(f\" Non-rate-limit error fetching details for trace {source_trace_id}. Failing this trace.\")\n", - " total_failed_fetch += 1\n", - " break # Stop retrying this trace's fetch\n", - "\n", - " if not fetch_detail_success: continue # Skip to next trace\n", - "\n", - " # Transform trace\n", - " try:\n", - " ingestion_batch = transform_trace_to_ingestion_batch(source_trace_full)\n", - " if not ingestion_batch:\n", - " print(f\" Skipping trace {source_trace_id} due to transformation error (returned empty batch).\")\n", - " total_failed_transform += 1\n", - " continue\n", - " # Store event details for potential error logging\n", - " current_batch_event_map = {event.id: event for event in ingestion_batch}\n", - " except Exception as e:\n", - " print(f\" Critical Error transforming trace {source_trace_id}: {e}\")\n", - " total_failed_transform += 1\n", - " continue # Skip to next trace\n", - "\n", - " # Push the batch with retry\n", - " push_success = False\n", - " push_retries = 0\n", - " while not push_success and push_retries < max_retries:\n", - " current_sleep_batch = sleep_between_batches * (2 ** push_retries)\n", - " time.sleep(current_sleep_batch)\n", - " try:\n", - " ingestion_response = langfuse_destination.api.ingestion.batch(batch=ingestion_batch)\n", - " push_success = True # Mark as attempted\n", - "\n", - " if ingestion_response.errors:\n", - " print(f\" Ingestion completed with errors for trace {source_trace_id}:\")\n", - " total_failed_push += 1 # Count trace as failed if any event fails\n", - " for i, error_detail in enumerate(ingestion_response.errors):\n", - " status = getattr(error_detail, 'status', 'N/A')\n", - " message = getattr(error_detail, 'message', 'No message')\n", - " failed_event_id = getattr(error_detail, 'id', None) # Use the ID from the error\n", - " failed_event = current_batch_event_map.get(failed_event_id) if failed_event_id else None\n", - "\n", - " print(f\" Error {i+1}: Status={status}, Message={message}\")\n", - " if failed_event:\n", - " print(f\" Failed Event Type: {getattr(failed_event, 'type', 'Unknown')}\")\n", - " try:\n", - " body_str = json.dumps(failed_event.body.dict(), indent=2, default=str, ensure_ascii=False)\n", - " print(f\" Failed Event Body (truncated): {body_str[:1000]}{'...' if len(body_str) > 1000 else ''}\")\n", - " except Exception as dump_err: print(f\" Failed Event Body: \")\n", - " else: print(f\" Failed Event ID: {failed_event_id} (Could not find matching event in batch)\")\n", - " break # Break retry loop even if errors occurred, as batch was processed partially\n", - " else:\n", - " print(f\" Successfully ingested trace {source_trace_id}\")\n", - " total_migrated += 1\n", - "\n", - " except Exception as e:\n", - " push_retries += 1\n", - " print(f\" Error pushing batch for trace {source_trace_id} (Attempt {push_retries}/{max_retries}): {e}\")\n", - " if \"429\" in str(e):\n", - " sleep_time = 2 ** push_retries\n", - " print(f\" Rate limit hit on batch(). Sleeping for {sleep_time}s...\")\n", - " time.sleep(sleep_time)\n", - " elif push_retries >= max_retries:\n", - " print(f\" Max retries reached pushing batch for trace {source_trace_id}.\")\n", - " total_failed_push += 1\n", - " else: # Non-429 error during push attempt\n", - " print(f\" Non-rate-limit error pushing batch for trace {source_trace_id}. Failing this trace.\")\n", - " total_failed_push += 1\n", - " break # Stop retrying push for this trace\n", - "\n", - " # Ensure loop eventually terminates if push fails after retries\n", - " if not push_success and push_retries >= max_retries: continue\n", + "\n", + " ok, failed_fetch, failed_transform, failed_push = migrate_trace_by_id(\n", + " langfuse_source=langfuse_source,\n", + " langfuse_destination=langfuse_destination,\n", + " trace_id=source_trace_id,\n", + " sleep_between_gets=sleep_between_gets,\n", + " sleep_between_batches=sleep_between_batches,\n", + " max_retries=max_retries\n", + " )\n", + "\n", + " if ok:\n", + " total_migrated += 1\n", + " total_failed_fetch += failed_fetch\n", + " total_failed_transform += failed_transform\n", + " total_failed_push += failed_push\n", "\n", "\n", " current_page_meta = getattr(trace_list.meta, 'page', page)\n", @@ -739,7 +801,7 @@ " print(\"Langfuse Traces Migration Script\")\n", " print(\"--------------------------------\")\n", " print(\"WARNING: Migrates full trace data. PRESERVES ORIGINAL TRACE IDS.\")\n", - " print(\"Ensure no ID collisions in the destination project!\")\n", + " print(\"Traces with the same id in the destination will not be replaces\")\n", " print(\"Includes retries with exponential backoff for rate limiting.\")\n", " print(\"--------------------------------\\n\")\n", "\n", @@ -804,6 +866,7 @@ "import os\n", "import sys\n", "import time\n", + "import urllib.parse\n", "import datetime as dt\n", "from langfuse import Langfuse\n", "from langfuse.api.resources.datasets.types import CreateDatasetRequest\n", @@ -887,7 +950,7 @@ " if \"404\" in str(get_err):\n", " print(f\" Dataset '{source_dataset.name}' not found in destination. Creating...\")\n", " create_ds_req = CreateDatasetRequest(\n", - " name=source_dataset.name, description=source_dataset.description, metadata=source_dataset.metadata\n", + " name=source_dataset.name, description=source_dataset.description, metadata=source_dataset.metadata\n", " )\n", " time.sleep(sleep_between_calls)\n", " created_dataset = langfuse_destination.api.datasets.create(request=create_ds_req)\n", @@ -934,15 +997,22 @@ " print(f\" Finished processing items for dataset '{source_dataset.name}'. Adding short delay before processing runs.\")\n", " time.sleep(sleep_between_calls * 2)\n", "\n", - " # 3. Migrate Dataset Run Items\n", + " # 3. Migrate Dataset Run Items (and ensure related traces exist in destination)\n", " print(f\" Fetching runs for dataset '{source_dataset.name}'...\")\n", " page_run = 1; limit_run = 100\n", + "\n", + " # Reuse trace migration backoff settings if configured\n", + " sleep_between_gets = float(os.getenv(\"LANGFUSE_MIGRATE_SLEEP_GET\", sleep_between_calls))\n", + " sleep_between_batches = float(os.getenv(\"LANGFUSE_MIGRATE_SLEEP_BATCH\", sleep_between_calls))\n", + " max_retries = int(os.getenv(\"LANGFUSE_MIGRATE_MAX_RETRIES\", 4))\n", + "\n", " while True:\n", " # print(f\" Fetching page {page_run} of runs metadata...\") # Less verbose logging\n", " time.sleep(sleep_between_calls)\n", " try:\n", + " #get_runs improperly urlencode its path params, so we do it manually, cf https://github.com/fern-api/fern/issues/13065\n", " runs_list = langfuse_source.api.datasets.get_runs(\n", - " dataset_name=source_dataset.name, page=page_run, limit=limit_run\n", + " dataset_name=urllib.parse.quote(source_dataset.name), page=page_run, limit=limit_run\n", " )\n", " if not runs_list.data: break # Done with runs for this dataset\n", " # print(f\" Fetched {len(runs_list.data)} runs metadata.\") # Less verbose logging\n", @@ -952,8 +1022,9 @@ " print(f\" Processing run: '{source_run_summary.name}'\")\n", " try:\n", " time.sleep(sleep_between_calls)\n", + " #get_run improperly urlencode its path params, so we do it manually, cf https://github.com/fern-api/fern/issues/13065\n", " source_run_full = langfuse_source.api.datasets.get_run(\n", - " dataset_name=source_dataset.name, run_name=source_run_summary.name\n", + " dataset_name=urllib.parse.quote(source_dataset.name), run_name=urllib.parse.quote(source_run_summary.name)\n", " )\n", " except Exception as e: print(f\" Error fetching full details for run '{source_run_summary.name}': {e}\"); run_links_failed += 1; continue\n", "\n", @@ -962,6 +1033,19 @@ " if not new_dest_item_id: print(f\" Warning: Could not find dest mapping for source item ID {source_run_item.dataset_item_id} in run '{source_run_summary.name}'. Skipping link.\"); run_links_failed += 1; continue\n", " if not source_run_item.trace_id and not source_run_item.observation_id: print(f\" Warning: Source run item for item {source_run_item.dataset_item_id} lacks trace/observation ID. Skipping link.\"); run_links_failed += 1; continue\n", "\n", + " # Ensure the related trace is migrated before creating the run link\n", + " if source_run_item.trace_id:\n", + " ok, _, _, _ = migrate_trace_by_id(\n", + " langfuse_source=langfuse_source,\n", + " langfuse_destination=langfuse_destination,\n", + " trace_id=source_run_item.trace_id,\n", + " sleep_between_gets=sleep_between_gets,\n", + " sleep_between_batches=sleep_between_batches,\n", + " max_retries=max_retries\n", + " )\n", + " if not ok:\n", + " print(f\" Warning: Failed to migrate trace {source_run_item.trace_id} for dataset run '{source_run_summary.name}'. Run link may be incomplete.\")\n", + "\n", " run_metadata = source_run_summary.metadata # Pass original run metadata\n", "\n", " create_run_item_req = CreateDatasetRunItemRequest(\n", From bb84d69bb349e9110ebc94593542211b1f153eee Mon Sep 17 00:00:00 2001 From: ouvreboite Date: Mon, 2 Mar 2026 17:12:10 +0100 Subject: [PATCH 3/5] [Data migration script] Use from/to params for the dataset runs --- cookbook/example_data_migration.ipynb | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/cookbook/example_data_migration.ipynb b/cookbook/example_data_migration.ipynb index 530eb26625..6881a549e1 100644 --- a/cookbook/example_data_migration.ipynb +++ b/cookbook/example_data_migration.ipynb @@ -801,7 +801,7 @@ " print(\"Langfuse Traces Migration Script\")\n", " print(\"--------------------------------\")\n", " print(\"WARNING: Migrates full trace data. PRESERVES ORIGINAL TRACE IDS.\")\n", - " print(\"Traces with the same id in the destination will not be replaces\")\n", + " print(\"Traces with the same id in the destination will not be replaced\")\n", " print(\"Includes retries with exponential backoff for rate limiting.\")\n", " print(\"--------------------------------\\n\")\n", "\n", @@ -892,7 +892,7 @@ " return None\n", "\n", "# --- Main Migration Function ---\n", - "def migrate_datasets(source_config, dest_config, sleep_between_calls=0.4):\n", + "def migrate_datasets(source_config, dest_config, sleep_between_calls, from_ts, to_ts):\n", " \"\"\"\n", " Migrates Datasets, Dataset Items, and Dataset Run Items from source to destination.\n", " ASSUMES Traces/Observations were previously migrated PRESERVING ORIGINAL IDs.\n", @@ -1020,6 +1020,15 @@ "\n", " for source_run_summary in runs_list.data:\n", " print(f\" Processing run: '{source_run_summary.name}'\")\n", + "\n", + " # Respect global migration time window for dataset runs, if configured\n", + " if from_ts and source_run_summary.created_at < from_ts:\n", + " print(f\" Skipping run {source_run_summary.name} as it is before the date threshold\") \n", + " continue\n", + " if to_ts and source_run_summary.created_at > to_ts:\n", + " print(f\" Skipping run {source_run_summary.name} as it is after the date threshold\") \n", + " continue\n", + " \n", " try:\n", " time.sleep(sleep_between_calls)\n", " #get_run improperly urlencode its path params, so we do it manually, cf https://github.com/fern-api/fern/issues/13065\n", @@ -1095,6 +1104,18 @@ " if not source_pk or not source_sk: print(\"Error: Source credentials env vars required.\"); sys.exit(1)\n", " if not dest_pk or not dest_sk: print(\"Error: Destination credentials env vars required.\"); sys.exit(1)\n", "\n", + " # Optional time filters for dataset runs (aligned with trace migration)\n", + " from_ts_str = os.getenv(\"LANGFUSE_MIGRATE_FROM_TIMESTAMP\")\n", + " to_ts_str = os.getenv(\"LANGFUSE_MIGRATE_TO_TIMESTAMP\")\n", + " from_ts = parse_datetime(from_ts_str) if from_ts_str else None\n", + " to_ts = parse_datetime(to_ts_str) if to_ts_str else None\n", + " if from_ts_str and not from_ts:\n", + " print(f\"Error: Invalid LANGFUSE_MIGRATE_FROM_TIMESTAMP='{from_ts_str}'. Skipping time filtering for dataset runs.\")\n", + " from_ts = None\n", + " if to_ts_str and not to_ts:\n", + " print(f\"Error: Invalid LANGFUSE_MIGRATE_TO_TIMESTAMP='{to_ts_str}'. Skipping time filtering for dataset runs.\")\n", + " to_ts = None\n", + "\n", " source_credentials = {\"public_key\": source_pk, \"secret_key\": source_sk, \"host\": source_host}\n", " destination_credentials = {\"public_key\": dest_pk, \"secret_key\": dest_sk, \"host\": dest_host}\n", "\n", @@ -1103,7 +1124,7 @@ "\n", " confirmation = input(\"\\nProceed with dataset migration (ASSUMING Trace IDs were preserved)? (yes/no): \").lower()\n", " if confirmation == 'yes':\n", - " migrate_datasets(source_credentials, destination_credentials, sleep_calls)\n", + " migrate_datasets(source_credentials, destination_credentials, sleep_calls, from_ts, to_ts)\n", " else: print(\"Migration cancelled by user.\")\n" ] }, From 5982d0448fd9d4f47f14e8ac3b7c5fe57d08282f Mon Sep 17 00:00:00 2001 From: ouvreboite Date: Tue, 3 Mar 2026 10:19:23 +0100 Subject: [PATCH 4/5] [Data migration script] Keep relative ordering of runs During the migration, the runs are recreated, so their creation date change. By precollecting all runs and ordering them by date, we ensure that their relative ordering is at least conserved --- cookbook/example_data_migration.ipynb | 102 +++++++++++++++----------- 1 file changed, 58 insertions(+), 44 deletions(-) diff --git a/cookbook/example_data_migration.ipynb b/cookbook/example_data_migration.ipynb index 6881a549e1..d1d06ea369 100644 --- a/cookbook/example_data_migration.ipynb +++ b/cookbook/example_data_migration.ipynb @@ -869,6 +869,7 @@ "import urllib.parse\n", "import datetime as dt\n", "from langfuse import Langfuse\n", + "from langfuse.api import DatasetRun\n", "from langfuse.api.resources.datasets.types import CreateDatasetRequest\n", "from langfuse.api.resources.dataset_items.types import CreateDatasetItemRequest\n", "from langfuse.api.resources.dataset_run_items.types import CreateDatasetRunItemRequest\n", @@ -1006,6 +1007,8 @@ " sleep_between_batches = float(os.getenv(\"LANGFUSE_MIGRATE_SLEEP_BATCH\", sleep_between_calls))\n", " max_retries = int(os.getenv(\"LANGFUSE_MIGRATE_MAX_RETRIES\", 4))\n", "\n", + " # First, collect all runs for this dataset (respecting the global time window)\n", + " runs_to_recreate : list[DatasetRun] = []\n", " while True:\n", " # print(f\" Fetching page {page_run} of runs metadata...\") # Less verbose logging\n", " time.sleep(sleep_between_calls)\n", @@ -1026,53 +1029,64 @@ " print(f\" Skipping run {source_run_summary.name} as it is before the date threshold\") \n", " continue\n", " if to_ts and source_run_summary.created_at > to_ts:\n", - " print(f\" Skipping run {source_run_summary.name} as it is after the date threshold\") \n", + " print(f\" Skipping run {source_run_summary.name} as it is after the date threshold\")\n", " continue\n", - " \n", - " try:\n", - " time.sleep(sleep_between_calls)\n", - " #get_run improperly urlencode its path params, so we do it manually, cf https://github.com/fern-api/fern/issues/13065\n", - " source_run_full = langfuse_source.api.datasets.get_run(\n", - " dataset_name=urllib.parse.quote(source_dataset.name), run_name=urllib.parse.quote(source_run_summary.name)\n", - " )\n", - " except Exception as e: print(f\" Error fetching full details for run '{source_run_summary.name}': {e}\"); run_links_failed += 1; continue\n", - "\n", - " for source_run_item in source_run_full.dataset_run_items:\n", - " new_dest_item_id = item_id_map.get(source_run_item.dataset_item_id)\n", - " if not new_dest_item_id: print(f\" Warning: Could not find dest mapping for source item ID {source_run_item.dataset_item_id} in run '{source_run_summary.name}'. Skipping link.\"); run_links_failed += 1; continue\n", - " if not source_run_item.trace_id and not source_run_item.observation_id: print(f\" Warning: Source run item for item {source_run_item.dataset_item_id} lacks trace/observation ID. Skipping link.\"); run_links_failed += 1; continue\n", - "\n", - " # Ensure the related trace is migrated before creating the run link\n", - " if source_run_item.trace_id:\n", - " ok, _, _, _ = migrate_trace_by_id(\n", - " langfuse_source=langfuse_source,\n", - " langfuse_destination=langfuse_destination,\n", - " trace_id=source_run_item.trace_id,\n", - " sleep_between_gets=sleep_between_gets,\n", - " sleep_between_batches=sleep_between_batches,\n", - " max_retries=max_retries\n", - " )\n", - " if not ok:\n", - " print(f\" Warning: Failed to migrate trace {source_run_item.trace_id} for dataset run '{source_run_summary.name}'. Run link may be incomplete.\")\n", - "\n", - " run_metadata = source_run_summary.metadata # Pass original run metadata\n", - "\n", - " create_run_item_req = CreateDatasetRunItemRequest(\n", - " run_name=source_run_summary.name,\n", - " dataset_item_id=new_dest_item_id,\n", - " trace_id=source_run_item.trace_id,\n", - " observation_id=source_run_item.observation_id,\n", - " run_description=source_run_summary.description,\n", - " metadata=run_metadata or None # Pass original run metadata here\n", - " )\n", - " try:\n", - " time.sleep(sleep_between_calls)\n", - " langfuse_destination.api.dataset_run_items.create(request=create_run_item_req)\n", - " run_links_created += 1\n", - " except Exception as e: print(f\" Error creating run item link for dest item {new_dest_item_id} in run '{source_run_summary.name}': {e}\"); run_links_failed += 1\n", + " runs_to_recreate.append(source_run_summary)\n", + "\n", + " if runs_list.meta.page >= getattr(runs_list.meta, 'total_pages', page_run):\n", + " break\n", "\n", - " if runs_list.meta.page >= getattr(runs_list.meta, 'total_pages', page_run): break\n", " page_run += 1\n", + " time.sleep(sleep_between_calls)\n", + "\n", + " # Sort runs by ascending creation time so oldest runs are recreated first and the relative order is conserved\n", + " runs_to_recreate.sort(key=lambda run: run.created_at)\n", + "\n", + " print(f\" Recreating '{len(runs_to_recreate)}' runs for dataset '{source_dataset.name}'\")\n", + "\n", + " for index, source_run_summary in enumerate(runs_to_recreate):\n", + " print(f\" Processing run '{index}'/'{len(runs_to_recreate)}': '{source_run_summary.name}'\")\n", + " try:\n", + " time.sleep(sleep_between_calls)\n", + " # get_run improperly urlencode its path params, so we do it manually, cf https://github.com/fern-api/fern/issues/13065\n", + " source_run_full = langfuse_source.api.datasets.get_run(\n", + " dataset_name=urllib.parse.quote(source_dataset.name), run_name=urllib.parse.quote(source_run_summary.name)\n", + " )\n", + " except Exception as e: print(f\" Error fetching full details for run '{source_run_summary.name}': {e}\"); run_links_failed += 1; continue\n", + "\n", + " for source_run_item in source_run_full.dataset_run_items:\n", + " new_dest_item_id = item_id_map.get(source_run_item.dataset_item_id)\n", + " if not new_dest_item_id: print(f\" Warning: Could not find dest mapping for source item ID {source_run_item.dataset_item_id} in run '{source_run_summary.name}'. Skipping link.\"); run_links_failed += 1; continue\n", + " if not source_run_item.trace_id and not source_run_item.observation_id: print(f\" Warning: Source run item for item {source_run_item.dataset_item_id} lacks trace/observation ID. Skipping link.\"); run_links_failed += 1; continue\n", + "\n", + " # Ensure the related trace is migrated before creating the run link\n", + " if source_run_item.trace_id:\n", + " ok, _, _, _ = migrate_trace_by_id(\n", + " langfuse_source=langfuse_source,\n", + " langfuse_destination=langfuse_destination,\n", + " trace_id=source_run_item.trace_id,\n", + " sleep_between_gets=sleep_between_gets,\n", + " sleep_between_batches=sleep_between_batches,\n", + " max_retries=max_retries\n", + " )\n", + " if not ok:\n", + " print(f\" Warning: Failed to migrate trace {source_run_item.trace_id} for dataset run '{source_run_summary.name}'. Run link may be incomplete.\")\n", + "\n", + " run_metadata = source_run_summary.metadata # Pass original run metadata\n", + "\n", + " create_run_item_req = CreateDatasetRunItemRequest(\n", + " run_name=source_run_summary.name,\n", + " dataset_item_id=new_dest_item_id,\n", + " trace_id=source_run_item.trace_id,\n", + " observation_id=source_run_item.observation_id,\n", + " run_description=source_run_summary.description,\n", + " metadata=run_metadata or None # Pass original run metadata here\n", + " )\n", + " try:\n", + " time.sleep(sleep_between_calls)\n", + " langfuse_destination.api.dataset_run_items.create(request=create_run_item_req)\n", + " run_links_created += 1\n", + " except Exception as e: print(f\" Error creating run item link for dest item {new_dest_item_id} in run '{source_run_summary.name}': {e}\"); run_links_failed += 1\n", "\n", " if datasets_list.meta.page >= getattr(datasets_list.meta, 'total_pages', page_ds): print(\"Processed the last page of datasets.\"); break\n", " page_ds += 1\n", From 9406afb610f7a6cdc43cf41d3e820b76eb5844f6 Mon Sep 17 00:00:00 2001 From: ouvreboite Date: Tue, 3 Mar 2026 11:39:44 +0100 Subject: [PATCH 5/5] [Data migration script] Maintain observation and score ids --- cookbook/example_data_migration.ipynb | 33 +++++++++++++++------------ 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/cookbook/example_data_migration.ipynb b/cookbook/example_data_migration.ipynb index d1d06ea369..986bf95bfd 100644 --- a/cookbook/example_data_migration.ipynb +++ b/cookbook/example_data_migration.ipynb @@ -314,6 +314,7 @@ "import json\n", "from langfuse import Langfuse\n", "# Corrected location for MapValue:\n", + "from langfuse.api import TraceWithFullDetails\n", "from langfuse.api.resources.commons.types import MapValue\n", "# Ingestion types:\n", "from langfuse.api.resources.ingestion.types import (\n", @@ -502,7 +503,7 @@ " print(f\"Warning: Could not format datetime {dt_obj}: {e}. Returning None.\")\n", " return None\n", "\n", - "def transform_trace_to_ingestion_batch(source_trace):\n", + "def transform_trace_to_ingestion_batch(source_trace: TraceWithFullDetails):\n", " \"\"\"\n", " Transforms a fetched TraceWithFullDetails object into a list of\n", " IngestionEvent objects suitable for the batch ingestion endpoint.\n", @@ -543,9 +544,6 @@ " # 2. Create Observation Events\n", " sorted_observations = sorted(source_trace.observations, key=lambda o: o.start_time)\n", " for source_obs in sorted_observations:\n", - " new_obs_id = str(uuid.uuid4())\n", - " obs_id_map[source_obs.id] = new_obs_id\n", - " new_parent_observation_id = obs_id_map.get(source_obs.parent_observation_id) if source_obs.parent_observation_id else None\n", " obs_metadata = source_obs.metadata if isinstance(source_obs.metadata, dict) else {}\n", "\n", " model_params_mapped = None\n", @@ -553,16 +551,23 @@ " elif source_obs.model_parameters is not None: print(f\"Warning: Obs {source_obs.id} model_parameters type {type(source_obs.model_parameters)}, skipping.\")\n", "\n", " common_body_args = {\n", - " \"id\": new_obs_id, \"trace_id\": preserved_trace_id, \"name\": source_obs.name,\n", - " \"start_time\": source_obs.start_time, \"metadata\": obs_metadata or None,\n", - " \"input\": source_obs.input, \"output\": source_obs.output, \"level\": source_obs.level,\n", - " \"status_message\": source_obs.status_message, \"parent_observation_id\": new_parent_observation_id,\n", - " \"version\": source_obs.version, \"environment\": source_obs.environment if source_obs.environment else \"default\",\n", + " \"id\": source_obs.id, \n", + " \"trace_id\": preserved_trace_id, \n", + " \"name\": source_obs.name,\n", + " \"start_time\": source_obs.start_time, \n", + " \"metadata\": obs_metadata or None,\n", + " \"input\": source_obs.input, \n", + " \"output\": source_obs.output, \n", + " \"level\": source_obs.level,\n", + " \"status_message\": source_obs.status_message, \n", + " \"parent_observation_id\": source_obs.parent_observation_id,\n", + " \"version\": source_obs.version, \n", + " \"environment\": source_obs.environment if source_obs.environment else \"default\",\n", " }\n", "\n", " event_body = None; ingestion_event_type = None\n", " event_specific_timestamp = safe_isoformat(dt.datetime.now(dt.timezone.utc))\n", - " if not event_specific_timestamp: print(f\"Error: Could not format timestamp for obs {new_obs_id}. Skipping.\"); continue\n", + " if not event_specific_timestamp: print(f\"Error: Could not format timestamp for obs {source_obs.id}. Skipping.\"); continue\n", "\n", " try:\n", " if source_obs.type == \"SPAN\":\n", @@ -607,8 +612,6 @@ "\n", " # 3. Create Score Events\n", " for source_score in source_trace.scores:\n", - " new_score_id = str(uuid.uuid4())\n", - " new_observation_id = obs_id_map.get(source_score.observation_id) if source_score.observation_id else None\n", " score_metadata = source_score.metadata if isinstance(source_score.metadata, dict) else {}\n", "\n", " score_body_value = None\n", @@ -635,7 +638,7 @@ "\n", " try:\n", " score_body = ScoreBody(\n", - " id=new_score_id,\n", + " id=source_score.id,\n", " trace_id=preserved_trace_id,\n", " name=source_score.name,\n", " # Pass the correctly typed value\n", @@ -644,7 +647,7 @@ " # string_value=string_value if source_score.data_type == \"CATEGORICAL\" else None, # Optional: maybe pass string_value only for categorical?\n", " source=source_score.source,\n", " comment=source_score.comment,\n", - " observation_id=new_observation_id,\n", + " observation_id=source_score.observation_id,\n", " timestamp=source_score.timestamp,\n", " config_id=source_score.config_id,\n", " metadata=score_metadata or None,\n", @@ -652,7 +655,7 @@ " environment=source_score.environment if source_score.environment else \"default\",\n", " )\n", " event_timestamp_str = safe_isoformat(dt.datetime.now(dt.timezone.utc))\n", - " if not event_timestamp_str: print(f\"Error: Could not format timestamp for score {new_score_id}. Skipping.\"); continue\n", + " if not event_timestamp_str: print(f\"Error: Could not format timestamp for score {source_score.id}. Skipping.\"); continue\n", " event_envelope_id = str(uuid.uuid4())\n", " ingestion_events.append(\n", " IngestionEvent_ScoreCreate(id=event_envelope_id, timestamp=event_timestamp_str, body=score_body)\n",