diff --git a/agentex/database/migrations/alembic/versions/2026_05_19_1929_adding_task_cleaned_at_6c942325c828.py b/agentex/database/migrations/alembic/versions/2026_05_19_1929_adding_task_cleaned_at_6c942325c828.py new file mode 100644 index 0000000..2e7acae --- /dev/null +++ b/agentex/database/migrations/alembic/versions/2026_05_19_1929_adding_task_cleaned_at_6c942325c828.py @@ -0,0 +1,26 @@ +"""adding task cleaned at + +Revision ID: 6c942325c828 +Revises: a9959ebcbe98 +Create Date: 2026-05-19 19:29:34.858692 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '6c942325c828' +down_revision: Union[str, None] = 'a9959ebcbe98' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('tasks', sa.Column('cleaned_at', sa.DateTime(timezone=True), nullable=True)) + + +def downgrade() -> None: + op.drop_column('tasks', 'cleaned_at') diff --git a/agentex/database/migrations/migration_history.txt b/agentex/database/migrations/migration_history.txt index 25e97dd..b18d86d 100644 --- a/agentex/database/migrations/migration_history.txt +++ b/agentex/database/migrations/migration_history.txt @@ -1,4 +1,6 @@ -9ff3ee32c81b -> e9c4ff9e6542 (head), add_tasks_metadata_gin_index +a9959ebcbe98 -> 6c942325c828 (head), adding task cleaned at +e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id +9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index 57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels 4a9b7787ccd7 -> 57c5ed4f59ae, add_task_id_to_spans d1a6cde41b3f -> 4a9b7787ccd7, deployments diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index ac50e36..8c0a86e 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -3533,6 +3533,112 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/export: + get: + tags: + - task-retention + summary: Export Task + description: 'Build a self-contained snapshot of a task''s content surfaces. + + + Returns the exact payload format that POST /rehydrate accepts, so + + export → clean → rehydrate is a round-trip-equivalent operation.' + operationId: export_task_tasks__task_id__export_get + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ExportTaskResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/clean: + post: + tags: + - task-retention + summary: Clean Task + description: 'Delete content-bearing rows for a stale task. + + + Refuses on active tasks, in-flight workflows, or unprocessed events + + regardless of `force`. The `force=true` flag only bypasses the + + idle-threshold check.' + operationId: clean_task_tasks__task_id__clean_post + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CleanTaskRequest' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CleanTaskResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/rehydrate: + post: + tags: + - task-retention + summary: Rehydrate Task + description: 'Restore content-bearing rows from a snapshot. + + + Refuses if the task isn''t currently in a cleaned state, or if any supplied + + message/state ID already exists in Mongo (catches double-rehydrate).' + operationId: rehydrate_task_tasks__task_id__rehydrate_post + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RehydrateTaskRequest' + responses: + '204': + description: Successful Response + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: ACPType: @@ -3970,6 +4076,48 @@ components: - checkpoint - metadata title: CheckpointTupleResponse + CleanTaskRequest: + properties: + force: + type: boolean + title: Force + description: Skip the idle-threshold check. Active-workflow and unprocessed-events + checks still apply. Admin use only. + default: false + idle_days: + type: integer + minimum: 1.0 + title: Idle Days + description: Idle threshold in days (ignored when force=true). + default: 7 + type: object + title: CleanTaskRequest + CleanTaskResponse: + properties: + task_id: + type: string + title: Task Id + cleaned_at: + type: string + format: date-time + title: Cleaned At + messages_deleted: + type: integer + title: Messages Deleted + task_states_deleted: + type: integer + title: Task States Deleted + events_deleted: + type: integer + title: Events Deleted + type: object + required: + - task_id + - cleaned_at + - messages_deleted + - task_states_deleted + - events_deleted + title: CleanTaskResponse CreateAPIKeyRequest: properties: agent_id: @@ -4326,6 +4474,33 @@ components: - author - data title: DataContent + DataContentEntity: + properties: + type: + type: string + const: data + title: Type + description: The type of the message, in this case `data`. + default: data + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + data: + additionalProperties: true + type: object + title: Data + description: The contents of the data message. + type: object + required: + - author + - data + title: DataContentEntity DataDelta: properties: type: @@ -4531,6 +4706,26 @@ components: - task_id - agent_id title: Event + ExportTaskResponse: + properties: + task_id: + type: string + title: Task Id + messages: + items: + $ref: '#/components/schemas/TaskMessageEntity' + type: array + title: Messages + task_states: + items: + $ref: '#/components/schemas/StateEntity' + type: array + title: Task States + type: object + required: + - task_id + title: ExportTaskResponse + description: Wire format mirrors the entity directly — schema parity is intentional. FileAttachment: properties: file_id: @@ -4557,6 +4752,32 @@ components: - type title: FileAttachment description: Represents a file attachment in messages. + FileAttachmentEntity: + properties: + file_id: + type: string + title: File Id + description: The unique ID of the attached file + name: + type: string + title: Name + description: The name of the file + size: + type: integer + title: Size + description: The size of the file in bytes + type: + type: string + title: Type + description: The MIME type or content type of the file + type: object + required: + - file_id + - name + - size + - type + title: FileAttachmentEntity + description: Represents a file attachment in messages. GetCheckpointTupleRequest: properties: thread_id: @@ -4799,6 +5020,42 @@ components: - content_index title: ReasoningContentDelta description: Delta for reasoning content updates + ReasoningContentEntity: + properties: + type: + type: string + const: reasoning + title: Type + description: The type of the message, in this case `reasoning`. + default: reasoning + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + summary: + items: + type: string + type: array + title: Summary + description: A list of short reasoning summaries + content: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Content + description: The reasoning content or chain-of-thought text + type: object + required: + - author + - summary + title: ReasoningContentEntity ReasoningSummaryDelta: properties: type: @@ -4949,6 +5206,26 @@ components: - updated_at title: RegisterAgentResponse description: Response model for registering an agent. + RehydrateTaskRequest: + properties: + task_id: + type: string + title: Task Id + messages: + items: + $ref: '#/components/schemas/TaskMessageEntity' + type: array + title: Messages + task_states: + items: + $ref: '#/components/schemas/StateEntity' + type: array + title: Task States + type: object + required: + - task_id + title: RehydrateTaskRequest + description: Same shape as the export response — round-trip parity. ScheduleActionInfo: properties: workflow_name: @@ -5315,6 +5592,60 @@ components: agent_id is globally unique. + The state is a dictionary of arbitrary data.' + StateEntity: + properties: + id: + anyOf: + - type: string + - type: 'null' + title: Id + description: The task state's unique id + task_id: + type: string + title: Task Id + description: ID of the task this state belongs to. The combination of task_id + and agent_id is globally unique. + agent_id: + type: string + title: Agent Id + description: ID of the agent this state belongs to. The combination of task_id + and agent_id is globally unique. + state: + additionalProperties: true + type: object + title: State + description: The state object that contains arbitrary data + created_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Created At + description: The timestamp when the state was created + updated_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Updated At + description: The timestamp when the state was last updated + type: object + required: + - task_id + - agent_id + - state + title: StateEntity + description: 'Represents a state in the agent system. A state is associated + uniquely with a task and an agent. + + + This entity is used to store states in MongoDB, with each state + + associated with a specific task and agent. The combination of task_id and + agent_id is globally unique. + + The state is a dictionary of arbitrary data.' StreamTaskMessageDelta: properties: @@ -5436,6 +5767,13 @@ components: format: date-time - type: 'null' title: The timestamp when the task was last updated + cleaned_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: The timestamp when the task's content was cleaned for retention compliance; + null when active params: anyOf: - additionalProperties: true @@ -5536,6 +5874,70 @@ components: text: '#/components/schemas/TextDelta' tool_request: '#/components/schemas/ToolRequestDelta' tool_response: '#/components/schemas/ToolResponseDelta' + TaskMessageEntity: + properties: + id: + anyOf: + - type: string + - type: 'null' + title: Id + description: The task message's unique id + task_id: + type: string + title: Task Id + description: ID of the task this message belongs to + content: + oneOf: + - $ref: '#/components/schemas/TextContentEntity' + - $ref: '#/components/schemas/DataContentEntity' + - $ref: '#/components/schemas/ToolRequestContentEntity' + - $ref: '#/components/schemas/ToolResponseContentEntity' + - $ref: '#/components/schemas/ReasoningContentEntity' + title: Content + description: The content of the message. This content is not OpenAI compatible. + These are messages that are meant to be displayed to the user. + discriminator: + propertyName: type + mapping: + data: '#/components/schemas/DataContentEntity' + reasoning: '#/components/schemas/ReasoningContentEntity' + text: '#/components/schemas/TextContentEntity' + tool_request: '#/components/schemas/ToolRequestContentEntity' + tool_response: '#/components/schemas/ToolResponseContentEntity' + streaming_status: + anyOf: + - type: string + enum: + - IN_PROGRESS + - DONE + - type: 'null' + title: In case of streaming, this indicates whether the message is still + being streamed or has been completed + created_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Created At + description: The timestamp when the message was created + updated_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Updated At + description: The timestamp when the message was last updated + type: object + required: + - task_id + - content + title: TaskMessageEntity + description: 'Represents a message in the agent system. + + + This entity is used to store messages in MongoDB, with each message + + associated with a specific task.' TaskMessageUpdate: oneOf: - $ref: '#/components/schemas/StreamTaskMessageStart' @@ -5588,6 +5990,13 @@ components: format: date-time - type: 'null' title: The timestamp when the task was last updated + cleaned_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: The timestamp when the task's content was cleaned for retention compliance; + null when active params: anyOf: - additionalProperties: true @@ -5672,6 +6081,45 @@ components: - author - content title: TextContent + TextContentEntity: + properties: + type: + type: string + const: text + title: Type + description: The type of the message, in this case `text`. + default: text + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + format: + $ref: '#/components/schemas/TextFormat' + description: The format of the message. This is used by the client to determine + how to display the message. + default: plain + content: + type: string + title: Content + description: The contents of the text message. + attachments: + anyOf: + - items: + $ref: '#/components/schemas/FileAttachmentEntity' + type: array + - type: 'null' + title: Attachments + description: Optional list of file attachments with structured metadata. + type: object + required: + - author + - content + title: TextContentEntity TextDelta: properties: type: @@ -5732,6 +6180,43 @@ components: - name - arguments title: ToolRequestContent + ToolRequestContentEntity: + properties: + type: + type: string + const: tool_request + title: Type + description: The type of the message, in this case `tool_request`. + default: tool_request + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + tool_call_id: + type: string + title: Tool Call Id + description: The ID of the tool call that is being requested. + name: + type: string + title: Name + description: The name of the tool that is being requested. + arguments: + additionalProperties: true + type: object + title: Arguments + description: The arguments to the tool. + type: object + required: + - author + - tool_call_id + - name + - arguments + title: ToolRequestContentEntity ToolRequestDelta: properties: type: @@ -5792,6 +6277,41 @@ components: - name - content title: ToolResponseContent + ToolResponseContentEntity: + properties: + type: + type: string + const: tool_response + title: Type + description: The type of the message, in this case `tool_response`. + default: tool_response + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + tool_call_id: + type: string + title: Tool Call Id + description: The ID of the tool call that is being responded to. + name: + type: string + title: Name + description: The name of the tool that is being responded to. + content: + title: Content + description: The result of the tool. + type: object + required: + - author + - tool_call_id + - name + - content + title: ToolResponseContentEntity ToolResponseDelta: properties: type: diff --git a/agentex/src/adapters/crud_store/adapter_mongodb.py b/agentex/src/adapters/crud_store/adapter_mongodb.py index 7a1eb6e..0d497e3 100644 --- a/agentex/src/adapters/crud_store/adapter_mongodb.py +++ b/agentex/src/adapters/crud_store/adapter_mongodb.py @@ -298,6 +298,20 @@ async def batch_create(self, items: list[T]) -> list[T]: message="One or more items have duplicate id values. IDs must be unique.", detail=str(e), ) from e + except pymongo.errors.BulkWriteError as e: + # insert_many raises BulkWriteError wrapping individual write errors. + # Translate to DuplicateItemError only when all underlying errors are + # duplicate-key (11000); otherwise surface as a generic ServiceError + # so callers can distinguish "fix your IDs" from "transient failure". + write_errors = e.details.get("writeErrors", []) if e.details else [] + if write_errors and all(we.get("code") == 11000 for we in write_errors): + raise DuplicateItemError( + message="One or more items have duplicate id values. IDs must be unique.", + detail=str(e), + ) from e + raise ServiceError( + message=f"Failed to batch create items in MongoDB: {e}", detail=str(e) + ) from e except ClientError: raise except Exception as e: diff --git a/agentex/src/adapters/orm.py b/agentex/src/adapters/orm.py index ac5ee39..42a66c1 100644 --- a/agentex/src/adapters/orm.py +++ b/agentex/src/adapters/orm.py @@ -72,6 +72,7 @@ class TaskORM(BaseORM): updated_at = Column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now() ) + cleaned_at = Column(DateTime(timezone=True), nullable=True) params = Column(JSONB, nullable=True) task_metadata = Column(JSONB, nullable=True) # Many-to-Many relationship with agents diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index f325b43..35d4c7f 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -27,6 +27,7 @@ schedules, spans, states, + task_retention, tasks, ) from src.config import dependencies @@ -187,6 +188,7 @@ async def handle_unexpected(request, exc): fastapi_app.include_router(deployments.router) fastapi_app.include_router(schedules.router) fastapi_app.include_router(checkpoints.router) +fastapi_app.include_router(task_retention.router) # Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses. # This must be the outermost layer to bypass all middleware. diff --git a/agentex/src/api/routes/task_retention.py b/agentex/src/api/routes/task_retention.py new file mode 100644 index 0000000..378bd8d --- /dev/null +++ b/agentex/src/api/routes/task_retention.py @@ -0,0 +1,90 @@ +""" +Task retention endpoints — export / clean / rehydrate. + +These power both local-dev testing of the round-trip and the long-term +admin / external-caller integration surface. The scheduled Temporal cleanup +workflow calls the same use case (TaskRetentionUseCase.clean_task), not +these endpoints. + +Authorization mirrors the existing /tasks routes via DAuthorizedId: +- export → read (returns content) +- rehydrate → update (writes content for a task the caller owns) +- clean → delete (destructive) +""" + +from fastapi import APIRouter + +from src.api.schemas.authorization_types import ( + AgentexResourceType, + AuthorizedOperationType, +) +from src.api.schemas.task_retention import ( + CleanTaskRequest, + CleanTaskResponse, + ExportTaskResponse, + RehydrateTaskRequest, +) +from src.domain.use_cases.task_retention_use_case import DTaskRetentionUseCase +from src.utils.authorization_shortcuts import DAuthorizedId + +router = APIRouter(prefix="/tasks", tags=["task-retention"]) + + +@router.get( + "/{task_id}/export", + response_model=ExportTaskResponse, +) +async def export_task( + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.read), + use_case: DTaskRetentionUseCase, +) -> ExportTaskResponse: + """ + Build a self-contained snapshot of a task's content surfaces. + + Returns the exact payload format that POST /rehydrate accepts, so + export → clean → rehydrate is a round-trip-equivalent operation. + """ + snapshot = await use_case.export_task(task_id) + return ExportTaskResponse.model_validate(snapshot) + + +@router.post( + "/{task_id}/clean", + response_model=CleanTaskResponse, +) +async def clean_task( + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.delete), + request: CleanTaskRequest, + use_case: DTaskRetentionUseCase, +) -> CleanTaskResponse: + """ + Delete content-bearing rows for a stale task. + + Refuses on active tasks, in-flight workflows, or unprocessed events + regardless of `force`. The `force=true` flag only bypasses the + idle-threshold check. + """ + audit = await use_case.clean_task( + task_id=task_id, + force=request.force, + idle_days=request.idle_days, + ) + return CleanTaskResponse.model_validate(audit) + + +@router.post( + "/{task_id}/rehydrate", + status_code=204, +) +async def rehydrate_task( + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.update), + request: RehydrateTaskRequest, + use_case: DTaskRetentionUseCase, +) -> None: + """ + Restore content-bearing rows from a snapshot. + + Refuses if the task isn't currently in a cleaned state, or if any supplied + message/state ID already exists in Mongo (catches double-rehydrate). + """ + await use_case.rehydrate_task(task_id=task_id, snapshot=request) diff --git a/agentex/src/api/schemas/task_retention.py b/agentex/src/api/schemas/task_retention.py new file mode 100644 index 0000000..e27a336 --- /dev/null +++ b/agentex/src/api/schemas/task_retention.py @@ -0,0 +1,37 @@ +from pydantic import BaseModel, Field + +from src.domain.entities.task_retention import ( + TaskCleanupResultEntity, + TaskSnapshotEntity, +) + + +class ExportTaskResponse(TaskSnapshotEntity): + """Wire format mirrors the entity directly — schema parity is intentional.""" + + pass + + +class CleanTaskRequest(BaseModel): + force: bool = Field( + default=False, + description=( + "Skip the idle-threshold check. Active-workflow and " + "unprocessed-events checks still apply. Admin use only." + ), + ) + idle_days: int = Field( + default=7, + ge=1, + description="Idle threshold in days (ignored when force=true).", + ) + + +class CleanTaskResponse(TaskCleanupResultEntity): + pass + + +class RehydrateTaskRequest(TaskSnapshotEntity): + """Same shape as the export response — round-trip parity.""" + + pass diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index 4055cad..9a8a53d 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -49,6 +49,10 @@ class Task(BaseModel): None, title="The timestamp when the task was last updated", ) + cleaned_at: datetime | None = Field( + None, + title="The timestamp when the task's content was cleaned for retention compliance; null when active", + ) params: dict[str, Any] | None = Field( None, title="Task parameters", diff --git a/agentex/src/domain/entities/task_retention.py b/agentex/src/domain/entities/task_retention.py new file mode 100644 index 0000000..c3aa440 --- /dev/null +++ b/agentex/src/domain/entities/task_retention.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from pydantic import Field + +from src.domain.entities.states import StateEntity +from src.domain.entities.task_messages import TaskMessageEntity +from src.utils.model_utils import BaseModel + + +class TaskSnapshotEntity(BaseModel): + """ + Self-contained, restorable snapshot of a task's content surfaces. + + Used as the response body of GET /tasks/{id}/export and the request body of + POST /tasks/{id}/rehydrate. Schema parity between the two directions is the + invariant that makes export → clean → rehydrate a round-trip-equivalent + operation. + + Scope note: tasks.params (JSONB) is intentionally NOT part of the snapshot + or the cleanup surface for v1. It may carry initial-message content for + some agents; if that becomes a compliance gap, add it as a follow-up. + Events are also NOT included — they are a transient delivery surface; + consumed events have no live readers (see agent_task_tracker cursor). + """ + + task_id: str + messages: list[TaskMessageEntity] = Field(default_factory=list) + task_states: list[StateEntity] = Field(default_factory=list) + + +class TaskCleanupResultEntity(BaseModel): + """ + Per-invocation result of a cleanup operation. + + Returned to callers of POST /tasks/{id}/clean and emitted as a structured + log line (forensic record). Not persisted to a dedicated table in v1 — + Datadog log search is the audit trail. + """ + + task_id: str + cleaned_at: datetime + messages_deleted: int + task_states_deleted: int + events_deleted: int diff --git a/agentex/src/domain/entities/tasks.py b/agentex/src/domain/entities/tasks.py index bdccc23..6a1ffce 100644 --- a/agentex/src/domain/entities/tasks.py +++ b/agentex/src/domain/entities/tasks.py @@ -50,6 +50,10 @@ class TaskEntity(BaseModel): None, title="The timestamp when the task was last updated", ) + cleaned_at: datetime | None = Field( + None, + title="The timestamp when the task's content was cleaned for retention compliance; null when active", + ) params: dict[str, Any] | None = Field( None, title="Task parameters", @@ -73,6 +77,7 @@ def convert_task_to_entity(task: Task) -> TaskEntity: status_reason=task.status_reason, created_at=task.created_at, updated_at=task.updated_at, + cleaned_at=task.cleaned_at, params=task.params, task_metadata=task.task_metadata, ) diff --git a/agentex/src/domain/repositories/agent_task_tracker_repository.py b/agentex/src/domain/repositories/agent_task_tracker_repository.py index 9ab8422..76b5882 100644 --- a/agentex/src/domain/repositories/agent_task_tracker_repository.py +++ b/agentex/src/domain/repositories/agent_task_tracker_repository.py @@ -1,6 +1,7 @@ from typing import Annotated from fastapi import Depends +from sqlalchemy import update from sqlalchemy.future import select from src.adapters.crud_store.adapter_postgres import ( PostgresCRUDRepository, @@ -120,6 +121,25 @@ async def update_agent_task_tracker( return result + async def reset_cursors_for_task(self, task_id: str) -> int: + """ + Reset last_processed_event_id to NULL for every tracker tied to a + task. Used during retention cleanup so that any future events arriving + after rehydration are processed from scratch. Returns rows updated. + """ + async with ( + self.start_async_db_session(True) as session, + async_sql_exception_handler(), + ): + stmt = ( + update(AgentTaskTrackerORM) + .where(AgentTaskTrackerORM.task_id == task_id) + .values(last_processed_event_id=None) + ) + result = await session.execute(stmt) + await session.commit() + return result.rowcount or 0 + DAgentTaskTrackerRepository = Annotated[ AgentTaskTrackerRepository, Depends(AgentTaskTrackerRepository) diff --git a/agentex/src/domain/repositories/event_repository.py b/agentex/src/domain/repositories/event_repository.py index 1aa750c..e69e9ff 100644 --- a/agentex/src/domain/repositories/event_repository.py +++ b/agentex/src/domain/repositories/event_repository.py @@ -1,7 +1,7 @@ from typing import Annotated from fastapi import Depends -from sqlalchemy import and_ +from sqlalchemy import and_, delete from sqlalchemy.dialects.postgresql import insert from sqlalchemy.future import select from src.adapters.crud_store.adapter_postgres import ( @@ -115,5 +115,16 @@ async def list_events_after_last_processed( return [EventEntity.model_validate(orm) for orm in event_orms] + async def delete_by_task_id(self, task_id: str) -> int: + """Delete all events for a task. Idempotent. Returns rows deleted.""" + async with ( + self.start_async_db_session(True) as session, + async_sql_exception_handler(), + ): + stmt = delete(EventORM).where(EventORM.task_id == task_id) + result = await session.execute(stmt) + await session.commit() + return result.rowcount or 0 + DEventRepository = Annotated[EventRepository, Depends(EventRepository)] diff --git a/agentex/src/domain/services/task_retention_service.py b/agentex/src/domain/services/task_retention_service.py new file mode 100644 index 0000000..825cdbc --- /dev/null +++ b/agentex/src/domain/services/task_retention_service.py @@ -0,0 +1,370 @@ +""" +TaskRetentionService — exports, cleans, and rehydrates task content. + +The same service methods back both the HTTP endpoints (admin / external +caller integration) and the Temporal cleanup activity (scheduled sweep). +Domain logic lives here; api/routes/ and src/temporal/ are thin wrappers. + +Cross-database operation ordering (see clean_task for details): +- Mongo deletes first (each delete-by-task-id is naturally idempotent). +- Postgres transaction last (carries the cleaned_at marker that gates re-cleaning). +- Temporal workflow termination outside the DB transaction (best-effort; history + retention will eventually expire it anyway). +""" + +from datetime import UTC, datetime, timedelta +from typing import Annotated + +from fastapi import Depends + +from src.adapters.temporal.adapter_temporal import DTemporalAdapter +from src.domain.entities.task_retention import ( + TaskCleanupResultEntity, + TaskSnapshotEntity, +) +from src.domain.entities.tasks import TaskStatus +from src.domain.exceptions import ClientError +from src.domain.repositories.agent_task_tracker_repository import ( + DAgentTaskTrackerRepository, +) +from src.domain.repositories.event_repository import DEventRepository +from src.domain.repositories.task_message_repository import DTaskMessageRepository +from src.domain.repositories.task_repository import DTaskRepository +from src.domain.repositories.task_state_repository import DTaskStateRepository +from src.domain.services.task_message_service import DTaskMessageService +from src.utils.logging import make_logger + +logger = make_logger(__name__) + +# Page size for paginated reads during export. Big enough to make most tasks +# fit in a single round-trip; small enough that a single oversized task can't +# blow the request budget. Revisit if export latency becomes an issue. +EXPORT_PAGE_SIZE = 500 + + +class TaskRetentionService: + def __init__( + self, + task_repository: DTaskRepository, + task_message_service: DTaskMessageService, + task_message_repository: DTaskMessageRepository, + task_state_repository: DTaskStateRepository, + event_repository: DEventRepository, + agent_task_tracker_repository: DAgentTaskTrackerRepository, + temporal_adapter: DTemporalAdapter, + ): + self.task_repository = task_repository + self.task_message_service = task_message_service + self.task_message_repository = task_message_repository + self.task_state_repository = task_state_repository + self.event_repository = event_repository + self.agent_task_tracker_repository = agent_task_tracker_repository + self.temporal_adapter = temporal_adapter + + async def export_task(self, task_id: str) -> TaskSnapshotEntity: + """ + Build a self-contained snapshot of all content-bearing rows for a task. + + Works for both active and cleaned tasks: + - Active: returns current live state (useful for debugging, ops snapshots). + - Cleaned: returns the (empty) state — primarily a no-op safety check that + tells the caller "nothing to export." + + Raises if the task does not exist. + """ + # 1. Load the task. task_repository.get raises if missing. + await self.task_repository.get(id=task_id) + + # 2. Page through messages, ordered chronologically so the snapshot + # replays cleanly on rehydrate. Pagination is 1-based in this codebase. + messages = [] + page_number = 1 + while True: + page = await self.task_message_service.get_messages( + task_id=task_id, + limit=EXPORT_PAGE_SIZE, + page_number=page_number, + order_by="created_at", + order_direction="asc", + ) + messages.extend(page) + if len(page) < EXPORT_PAGE_SIZE: + break + page_number += 1 + + # 3. Page through task_states. + task_states = [] + page_number = 1 + while True: + page = await self.task_state_repository.find_by_field( + "task_id", + task_id, + limit=EXPORT_PAGE_SIZE, + page_number=page_number, + sort_by={"created_at": 1}, + ) + task_states.extend(page) + if len(page) < EXPORT_PAGE_SIZE: + break + page_number += 1 + + return TaskSnapshotEntity( + task_id=task_id, + messages=messages, + task_states=task_states, + ) + + async def clean_task( + self, + task_id: str, + *, + enforce_idle_threshold: bool = True, + idle_days: int = 7, + ) -> TaskCleanupResultEntity: + """ + Delete content-bearing rows for a stale task. Idempotent: re-running on a + partially-cleaned or fully-cleaned task is safe. + + Args: + task_id: The task to clean. + enforce_idle_threshold: When True (default), refuses to clean a task + whose last interaction is more recent than `idle_days`. The + scheduled Temporal sweep always sets True. The admin endpoint + accepts a force=true flag that flips this to False. + idle_days: Idle threshold in days (when enforce_idle_threshold=True). + + Refuses (raises) if: + - task is currently active (status == RUNNING). + - enforce_idle_threshold=True and the task is not idle long enough. + - unprocessed events exist past agent_task_tracker cursors. + + If cleaned_at IS NOT NULL (already cleaned), returns an empty result + (zero rows deleted) rather than raising. + + Returns the result record describing what was deleted; the same record + is emitted as a structured log line for forensics. + + ORDER OF OPERATIONS (load-bearing for retry safety): + 1. Reload task fresh. Bail with empty-result if cleaned_at is set. + 2. Verify task status and (if enforced) idle threshold. + 3. Verify no events past agent_task_tracker.last_processed_event_id. + OPTIMISTIC: no row locks. Race window: a new EVENT_SEND between + this check and step 6c will be deleted with the rest. Acceptable + because (a) it can only happen on a task that's been idle ≥7d and + then suddenly receives an event — rare; (b) the structured log + below surfaces any cleanup with events_deleted > 0 on an + idle-checked task, giving forensic signal. + 4. Mongo: delete messages by task_id (idempotent). + 5. Mongo: delete task_states by task_id (idempotent). + 6. Postgres (separate operations, each idempotent): + a. delete events by task_id + b. reset agent_task_tracker cursors for task_id + c. update tasks.cleaned_at = now() + 7. Emit structured log with the TaskCleanupResultEntity payload. + + Note on Temporal workflows: Agentex doesn't own workflow IDs for agent + tasks (the agent's ACP server creates them). By the time a task is + idle ≥7d, any associated workflow should already be terminal. + The active-status guard in step 2 catches the case where it isn't. + """ + # 1. Reload task; bail if already cleaned. + task = await self.task_repository.get(id=task_id) + if task.cleaned_at is not None: + return TaskCleanupResultEntity( + task_id=task_id, + cleaned_at=task.cleaned_at, + messages_deleted=0, + task_states_deleted=0, + events_deleted=0, + ) + + # 2. Status + idle threshold guards. + if task.status == TaskStatus.RUNNING: + raise ClientError( + f"Cannot clean task {task_id}: status is RUNNING (active)" + ) + if enforce_idle_threshold and not await self._is_task_idle(task, idle_days): + raise ClientError( + f"Cannot clean task {task_id}: not idle for {idle_days} days " + f"(use force=true to override)" + ) + + # 3. Unprocessed-events guard. + if await self._has_unprocessed_events(task_id): + raise ClientError(f"Cannot clean task {task_id}: unprocessed events remain") + + # 4-5. Mongo deletes. + messages_deleted = await self.task_message_service.delete_all_messages(task_id) + task_states_deleted = await self.task_state_repository.delete_by_field( + "task_id", task_id + ) + + # 6a-b. Postgres deletes / resets. + events_deleted = await self.event_repository.delete_by_task_id(task_id) + await self.agent_task_tracker_repository.reset_cursors_for_task(task_id) + + # 6c. Mark task as cleaned. + cleaned_at = datetime.now(UTC) + task.cleaned_at = cleaned_at + await self.task_repository.update(task) + + result = TaskCleanupResultEntity( + task_id=task_id, + cleaned_at=cleaned_at, + messages_deleted=messages_deleted, + task_states_deleted=task_states_deleted, + events_deleted=events_deleted, + ) + + # 7. Forensic log line. Structured extras so Datadog can facet on them. + logger.info( + "task_cleanup_completed", + extra={ + "task_id": result.task_id, + "cleaned_at": result.cleaned_at.isoformat(), + "messages_deleted": result.messages_deleted, + "task_states_deleted": result.task_states_deleted, + "events_deleted": result.events_deleted, + }, + ) + + return result + + async def rehydrate_task( + self, + task_id: str, + snapshot: TaskSnapshotEntity, + ) -> None: + """ + Restore content-bearing rows from a snapshot. Inverse of clean_task. + + Refuses (raises) if: + - snapshot.task_id != task_id (catch payload misuse). + - cleaned_at IS NULL on the task (would clobber live data). + - any supplied message.id or task_state.id already exists in Mongo + (collision → DuplicateItemError surfaced from the adapter). + + Order of operations (mirror of clean_task): + 1. Reload task; verify cleaned_at IS NOT NULL. + 2. Mongo: batch insert messages with caller-supplied IDs. + 3. Mongo: batch insert task_states with caller-supplied IDs. + 4. Postgres: update tasks set cleaned_at = NULL. + (Events are not restored; cursors stay NULL from clean_task. + tasks.params is not touched — out of scope for v1.) + + Partial-insert hazard: insert_many is ordered, so a duplicate ID in + the middle of the batch leaves prior inserts committed. Acceptable + for v1 — the typical "double rehydrate" case has the collision in + position 0 (no prior commits). Operators can recover by manually + deleting the partial inserts and retrying. + + Note: ID preservation requires the caller to capture original Agentex IDs + at write time and store them alongside content in their external system. + This is a contract on the caller's integration, not enforced by Agentex. + """ + # Validate payload before touching anything. + if snapshot.task_id != task_id: + raise ClientError( + f"Snapshot task_id ({snapshot.task_id}) does not match " + f"path task_id ({task_id})" + ) + + # Reject any embedded entity whose task_id disagrees with the path. + # Mongo has no foreign key to tasks, so an unchecked batch_create here + # would write rows that get tagged to a task the caller may not own. + for i, message in enumerate(snapshot.messages): + if message.task_id != task_id: + raise ClientError( + f"Snapshot message[{i}] task_id ({message.task_id}) does not " + f"match path task_id ({task_id})" + ) + for i, state in enumerate(snapshot.task_states): + if state.task_id != task_id: + raise ClientError( + f"Snapshot task_states[{i}] task_id ({state.task_id}) does " + f"not match path task_id ({task_id})" + ) + + # 1. Reload task; refuse if not in cleaned state. + task = await self.task_repository.get(id=task_id) + if task.cleaned_at is None: + raise ClientError( + f"Cannot rehydrate task {task_id}: task is not in cleaned state " + f"(cleaned_at is NULL)" + ) + + # 2. Insert messages with caller-supplied IDs. + if snapshot.messages: + await self.task_message_repository.batch_create(snapshot.messages) + + # 3. Insert task_states with caller-supplied IDs. + if snapshot.task_states: + await self.task_state_repository.batch_create(snapshot.task_states) + + # 4. Clear cleaned_at on the task row. + task.cleaned_at = None + await self.task_repository.update(task) + + logger.info( + "task_rehydrate_completed", + extra={ + "task_id": task_id, + "messages_restored": len(snapshot.messages), + "task_states_restored": len(snapshot.task_states), + }, + ) + + # ---- internal helpers ---- + + async def _is_task_idle(self, task, idle_days: int) -> bool: + """ + True iff the task has no interaction within the idle window. + + Last-interaction = max(task.updated_at, latest message created_at). + `task.updated_at` alone would miss tasks where the only recent + activity is Mongo message writes (which don't bump the Postgres row). + """ + cutoff = datetime.now(UTC) - timedelta(days=idle_days) + last_interaction = task.updated_at + + latest_messages = await self.task_message_service.get_messages( + task_id=task.id, + limit=1, + page_number=1, + order_by="created_at", + order_direction="desc", + ) + if latest_messages and latest_messages[0].created_at is not None: + # Mongo timestamps come back as naive datetimes; treat as UTC so + # they compare cleanly with Postgres TIMESTAMPTZ values. + latest_at = latest_messages[0].created_at + if latest_at.tzinfo is None: + latest_at = latest_at.replace(tzinfo=UTC) + if last_interaction is None or latest_at > last_interaction: + last_interaction = latest_at + + if last_interaction is None: + return True + return last_interaction < cutoff + + async def _has_unprocessed_events(self, task_id: str) -> bool: + """ + True iff any events exist past agent_task_tracker.last_processed_event_id + for any (task, agent) pair tied to this task. + """ + trackers = await self.agent_task_tracker_repository.find_by_field( + "task_id", task_id + ) + for tracker in trackers: + pending = await self.event_repository.list_events_after_last_processed( + task_id=task_id, + agent_id=tracker.agent_id, + last_processed_event_id=tracker.last_processed_event_id, + limit=1, + ) + if pending: + return True + return False + + +DTaskRetentionService = Annotated[TaskRetentionService, Depends(TaskRetentionService)] diff --git a/agentex/src/domain/use_cases/task_retention_use_case.py b/agentex/src/domain/use_cases/task_retention_use_case.py new file mode 100644 index 0000000..6544e09 --- /dev/null +++ b/agentex/src/domain/use_cases/task_retention_use_case.py @@ -0,0 +1,53 @@ +from typing import Annotated + +from fastapi import Depends + +from src.domain.entities.task_retention import ( + TaskCleanupResultEntity, + TaskSnapshotEntity, +) +from src.domain.services.task_retention_service import DTaskRetentionService + + +class TaskRetentionUseCase: + """ + Orchestrates export / clean / rehydrate operations for retention compliance. + Backs both the HTTP admin endpoints and the Temporal scheduled cleanup + activity; keep this layer thin so both callers exercise identical logic. + """ + + def __init__(self, retention_service: DTaskRetentionService): + self.retention_service = retention_service + + async def export_task(self, task_id: str) -> TaskSnapshotEntity: + return await self.retention_service.export_task(task_id) + + async def clean_task( + self, + task_id: str, + force: bool = False, + idle_days: int = 7, + ) -> TaskCleanupResultEntity: + """ + force=True is the admin escape hatch; it bypasses the idle-threshold + check (but NOT the active-workflow / unprocessed-events checks, which + protect correctness, not policy). + """ + return await self.retention_service.clean_task( + task_id=task_id, + enforce_idle_threshold=not force, + idle_days=idle_days, + ) + + async def rehydrate_task( + self, + task_id: str, + snapshot: TaskSnapshotEntity, + ) -> None: + await self.retention_service.rehydrate_task( + task_id=task_id, + snapshot=snapshot, + ) + + +DTaskRetentionUseCase = Annotated[TaskRetentionUseCase, Depends(TaskRetentionUseCase)] diff --git a/agentex/tests/integration/api/task_retention/__init__.py b/agentex/tests/integration/api/task_retention/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agentex/tests/integration/api/task_retention/test_task_retention_api.py b/agentex/tests/integration/api/task_retention/test_task_retention_api.py new file mode 100644 index 0000000..2381fc2 --- /dev/null +++ b/agentex/tests/integration/api/task_retention/test_task_retention_api.py @@ -0,0 +1,392 @@ +""" +Integration tests for task retention endpoints: export / clean / rehydrate. + +Covers the round-trip invariant (export → clean → rehydrate → export yields a +byte-identical snapshot), each precondition guard, and the cross-store cleanup +surfaces (Mongo messages, Mongo task_states, Postgres events, Postgres +agent_task_tracker cursor, Postgres tasks.cleaned_at). +""" + +import pytest +import pytest_asyncio +from src.domain.entities.agents import ACPType, AgentEntity +from src.domain.entities.states import StateEntity +from src.domain.entities.task_messages import TaskMessageEntity, TextContentEntity +from src.domain.entities.tasks import TaskEntity, TaskStatus +from src.utils.ids import orm_id + + +@pytest.mark.asyncio +class TestTaskRetentionAPIIntegration: + """Integration tests for /tasks/{id}/{export,clean,rehydrate}.""" + + @pytest_asyncio.fixture + async def test_agent(self, isolated_repositories): + agent_repo = isolated_repositories["agent_repository"] + return await agent_repo.create( + AgentEntity( + id=orm_id(), + name="test-retention-agent", + description="Agent for retention testing", + acp_url="http://test-acp:8000", + acp_type=ACPType.SYNC, + ) + ) + + @pytest_asyncio.fixture + async def stale_task(self, isolated_repositories, test_agent): + """Non-RUNNING task — eligible for cleanup.""" + task_repo = isolated_repositories["task_repository"] + return await task_repo.create( + agent_id=test_agent.id, + task=TaskEntity( + id=orm_id(), + name="stale-task", + status=TaskStatus.FAILED, + status_reason="test fixture", + ), + ) + + @pytest_asyncio.fixture + async def running_task(self, isolated_repositories, test_agent): + """RUNNING task — should be refused by clean.""" + task_repo = isolated_repositories["task_repository"] + return await task_repo.create( + agent_id=test_agent.id, + task=TaskEntity( + id=orm_id(), + name="running-task", + status=TaskStatus.RUNNING, + status_reason="test fixture", + ), + ) + + async def _seed_messages(self, isolated_repositories, task_id, count): + message_repo = isolated_repositories["task_message_repository"] + messages = [] + for i in range(count): + messages.append( + await message_repo.create( + TaskMessageEntity( + id=orm_id(), + task_id=task_id, + content=TextContentEntity( + type="text", author="user", content=f"msg {i}" + ), + streaming_status="DONE", + ) + ) + ) + return messages + + async def _seed_state(self, isolated_repositories, task_id, agent_id): + state_repo = isolated_repositories["task_state_repository"] + return await state_repo.create( + StateEntity( + id=orm_id(), + task_id=task_id, + agent_id=agent_id, + state={"counter": 1, "nested": {"k": "v"}}, + ) + ) + + # ---- export ---- + + async def test_export_returns_full_snapshot( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + await self._seed_messages(isolated_repositories, stale_task.id, 3) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + + response = await isolated_client.get(f"/tasks/{stale_task.id}/export") + + assert response.status_code == 200 + snapshot = response.json() + assert snapshot["task_id"] == stale_task.id + assert len(snapshot["messages"]) == 3 + assert len(snapshot["task_states"]) == 1 + # Messages ordered chronologically (asc by created_at) + contents = [m["content"]["content"] for m in snapshot["messages"]] + assert contents == ["msg 0", "msg 1", "msg 2"] + + async def test_export_empty_task_returns_empty_collections( + self, isolated_client, stale_task + ): + response = await isolated_client.get(f"/tasks/{stale_task.id}/export") + + assert response.status_code == 200 + snapshot = response.json() + assert snapshot["messages"] == [] + assert snapshot["task_states"] == [] + + async def test_export_nonexistent_task_returns_404(self, isolated_client): + response = await isolated_client.get( + "/tasks/00000000-0000-0000-0000-000000000000/export" + ) + + assert response.status_code == 404 + + # ---- clean ---- + + async def test_clean_force_succeeds_and_clears_all_surfaces( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + await self._seed_messages(isolated_repositories, stale_task.id, 3) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + assert response.status_code == 200 + result = response.json() + assert result["task_id"] == stale_task.id + assert result["messages_deleted"] == 3 + assert result["task_states_deleted"] == 1 + assert result["events_deleted"] == 0 + assert result["cleaned_at"] is not None + + # Verify Mongo surfaces are empty. + export_after = ( + await isolated_client.get(f"/tasks/{stale_task.id}/export") + ).json() + assert export_after["messages"] == [] + assert export_after["task_states"] == [] + + # Verify tasks.cleaned_at is set. + task_after = (await isolated_client.get(f"/tasks/{stale_task.id}")).json() + assert task_after["cleaned_at"] is not None + + async def test_clean_resets_agent_task_tracker_cursor( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + # Plant a cursor on the auto-created tracker so we can prove the reset. + tracker_repo = isolated_repositories["agent_task_tracker_repository"] + trackers = await tracker_repo.find_by_field("task_id", stale_task.id) + assert len(trackers) == 1 + # task_repository.create creates a tracker with cursor=None already; the + # property we care about is that the reset path runs idempotently. + + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + trackers_after = await tracker_repo.find_by_field("task_id", stale_task.id) + assert len(trackers_after) == 1 + assert trackers_after[0].last_processed_event_id is None + + async def test_clean_running_task_returns_400(self, isolated_client, running_task): + response = await isolated_client.post( + f"/tasks/{running_task.id}/clean", json={"force": True} + ) + + assert response.status_code == 400 + assert "RUNNING" in response.json()["message"] + + async def test_clean_already_cleaned_task_returns_empty_result( + self, isolated_client, isolated_repositories, stale_task + ): + await self._seed_messages(isolated_repositories, stale_task.id, 2) + # First clean — does the work. + first = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + assert first.status_code == 200 + first_cleaned_at = first.json()["cleaned_at"] + + # Second clean — should be a no-op returning the prior cleaned_at. + second = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + assert second.status_code == 200 + result = second.json() + assert result["messages_deleted"] == 0 + assert result["task_states_deleted"] == 0 + assert result["events_deleted"] == 0 + assert result["cleaned_at"] == first_cleaned_at + + async def test_clean_unprocessed_events_returns_400( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + # Plant an event with no cursor advancement; _has_unprocessed_events will + # see the event past the (null) cursor and refuse. + event_repo = isolated_repositories["event_repository"] + await event_repo.create( + id=orm_id(), + task_id=stale_task.id, + agent_id=test_agent.id, + content=None, + ) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + assert response.status_code == 400 + assert "unprocessed events" in response.json()["message"] + + async def test_clean_nonexistent_task_returns_404(self, isolated_client): + response = await isolated_client.post( + "/tasks/00000000-0000-0000-0000-000000000000/clean", + json={"force": True}, + ) + + assert response.status_code == 404 + + # ---- rehydrate ---- + + async def test_round_trip_is_byte_identical( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + """ + The load-bearing invariant: export → clean → rehydrate → export + yields the same snapshot down to IDs and timestamps. ID preservation + is what makes rehydrated tasks indistinguishable from the original. + """ + await self._seed_messages(isolated_repositories, stale_task.id, 3) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + + snapshot_before = ( + await isolated_client.get(f"/tasks/{stale_task.id}/export") + ).json() + + clean = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + assert clean.status_code == 200 + + rehydrate = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=snapshot_before + ) + assert rehydrate.status_code == 204 + + snapshot_after = ( + await isolated_client.get(f"/tasks/{stale_task.id}/export") + ).json() + + assert snapshot_after == snapshot_before + + # Sanity: task is back to active (cleaned_at=None). + task_after = (await isolated_client.get(f"/tasks/{stale_task.id}")).json() + assert task_after["cleaned_at"] is None + + async def test_rehydrate_active_task_returns_400(self, isolated_client, stale_task): + # Task was never cleaned; rehydrate must refuse. + payload = {"task_id": stale_task.id, "messages": [], "task_states": []} + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "not in cleaned state" in response.json()["message"] + + async def test_rehydrate_task_id_mismatch_returns_400( + self, isolated_client, stale_task + ): + # Clean first so we'd otherwise pass the cleaned_at guard. + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + payload = { + "task_id": "00000000-0000-0000-0000-000000000000", + "messages": [], + "task_states": [], + } + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "does not match" in response.json()["message"] + + async def test_rehydrate_rejects_mismatched_message_task_id( + self, isolated_client, isolated_repositories, stale_task + ): + """ + Defense in depth: even when snapshot.task_id matches the path, each + embedded message's task_id must also match — otherwise a caller could + smuggle messages into a different task's collection through rehydrate. + """ + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + foreign_task_id = "00000000-0000-0000-0000-000000000000" + payload = { + "task_id": stale_task.id, + "messages": [ + { + "id": orm_id(), + "task_id": foreign_task_id, + "content": {"type": "text", "author": "user", "content": "x"}, + "streaming_status": "DONE", + } + ], + "task_states": [], + } + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "message[0]" in response.json()["message"] + + async def test_rehydrate_rejects_mismatched_task_state_task_id( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + foreign_task_id = "00000000-0000-0000-0000-000000000000" + payload = { + "task_id": stale_task.id, + "messages": [], + "task_states": [ + { + "id": orm_id(), + "task_id": foreign_task_id, + "agent_id": test_agent.id, + "state": {"k": "v"}, + } + ], + } + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "task_states[0]" in response.json()["message"] + + async def test_rehydrate_id_collision_returns_400( + self, isolated_client, isolated_repositories, stale_task + ): + # Seed, snapshot, clean, then re-insert one message so the rehydrate + # collides on _id. + await self._seed_messages(isolated_repositories, stale_task.id, 2) + snapshot = (await isolated_client.get(f"/tasks/{stale_task.id}/export")).json() + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + # Plant a colliding doc with the same _id the rehydrate would use. + first = snapshot["messages"][0] + message_repo = isolated_repositories["task_message_repository"] + await message_repo.create( + TaskMessageEntity( + id=first["id"], + task_id=stale_task.id, + content=TextContentEntity( + type="text", author="user", content="planted collision" + ), + streaming_status="DONE", + ) + ) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=snapshot + ) + + assert response.status_code == 400 + assert "duplicate id" in response.json()["message"].lower() diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index b715d22..4ba0ceb 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -383,6 +383,7 @@ async def isolated_integration_app( from src.domain.use_cases.messages_use_case import MessagesUseCase from src.domain.use_cases.spans_use_case import SpanUseCase from src.domain.use_cases.states_use_case import StatesUseCase + from src.domain.use_cases.task_retention_use_case import TaskRetentionUseCase from src.domain.use_cases.tasks_use_case import TasksUseCase # Create use case factory functions with isolated repositories @@ -466,6 +467,28 @@ def create_messages_use_case(): return MessagesUseCase(task_message_service=task_message_service) + def create_task_retention_use_case(): + """Create TaskRetentionUseCase wired to isolated repos for retention tests.""" + from src.domain.services.task_message_service import TaskMessageService + from src.domain.services.task_retention_service import TaskRetentionService + from src.domain.use_cases.task_retention_use_case import TaskRetentionUseCase + + task_message_service = TaskMessageService( + message_repository=isolated_repositories["task_message_repository"] + ) + retention_service = TaskRetentionService( + task_repository=isolated_repositories["task_repository"], + task_message_service=task_message_service, + task_message_repository=isolated_repositories["task_message_repository"], + task_state_repository=isolated_repositories["task_state_repository"], + event_repository=isolated_repositories["event_repository"], + agent_task_tracker_repository=isolated_repositories[ + "agent_task_tracker_repository" + ], + temporal_adapter=isolated_temporal_adapter, + ) + return TaskRetentionUseCase(retention_service=retention_service) + # Import dependency types and repository classes that need to be overridden from src.adapters.streams.adapter_redis import RedisStreamRepository from src.config.dependencies import ( @@ -510,6 +533,7 @@ def create_messages_use_case(): AgentTaskTrackerUseCase: create_agent_task_tracker_use_case, TasksUseCase: create_tasks_use_case, MessagesUseCase: create_messages_use_case, + TaskRetentionUseCase: create_task_retention_use_case, AgentAPIKeysUseCase: create_agent_api_keys_use_case, DeploymentHistoryUseCase: create_deployment_history_use_case, # Repositories - these ensure consistent isolated instances