Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
89 changes: 89 additions & 0 deletions eventbridge-scheduler-ai-agent-trigger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Amazon EventBridge Scheduler to Amazon Bedrock AI Agent

This pattern demonstrates how to trigger an Amazon Bedrock AI Agent on a recurring schedule using Amazon EventBridge Scheduler. An orchestrator AWS Lambda function, invoked by the scheduler, sends a task payload to the Bedrock Agent, which processes the input, generates an execution summary, and persists the result to a Amazon DynamoDB table via an action group Lambda.

Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/eventbridge-scheduler-ai-agent-trigger

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [Terraform](https://www.terraform.io/downloads.html) >= 1.0 installed

## Architecture

![Architecture Diagram](Architecture.png)

The pattern deploys the following resources:

1. **Amazon EventBridge Scheduler** – Triggers the orchestrator Lambda on a recurring schedule (default: `rate(1 hour)`).
2. **Orchestrator Lambda** (Python 3.14) – Receives the scheduler event and invokes the Bedrock Agent with a task payload.
3. **Amazon Bedrock Agent** – Processes the task payload, generates an execution summary using a foundation model (default: Claude 3 Haiku), and calls the action group.
4. **Action Group Lambda** (Python 3.14) – Persists execution records to DynamoDB.
5. **Amazon DynamoDB Table** – Stores task execution records.
6. **Amazon SQS Dead-Letter Queue** – Captures failed scheduler invocations after retries are exhausted.

## Deployment Instructions

1. Clone the repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
1. Change directory to the pattern directory:
```
cd serverless-patterns/eventbridge-scheduler-ai-agent-trigger
```
1. Initialize Terraform:
```
terraform init
```
1. Deploy the infrastructure:
```
terraform apply -auto-approve
```
During the prompts, provide values for:
* `aws_region` – AWS region (e.g. `us-east-1`)
* `prefix` – Unique prefix for all resource names

1. Note the outputs from the deployment. These contain the resource names and ARNs used for testing.

## How it works

1. EventBridge Scheduler fires on the configured schedule and invokes the orchestrator Lambda with a JSON payload containing `taskType`, `scheduleName`, and `scheduledTime`.
2. The orchestrator Lambda calls `bedrock-agent-runtime:InvokeAgent` with the payload, targeting the agent alias.
3. The Bedrock Agent parses the payload, generates an executive summary using the foundation model, and calls the `recordTaskExecution` action group.
4. The action group Lambda writes the execution record (task ID, type, scheduled time, summary, and recorded timestamp) to the DynamoDB table.
5. If the scheduler invocation fails after 3 retries, the event is sent to the SQS dead-letter queue.

## Testing

1. Replace `<prefix>` with the prefix chosen during deployment and invoke the orchestrator Lambda function manually:
```
aws lambda invoke \
--function-name <prefix>-agent-orchestrator \
--payload '{"taskType":"scheduled-report","scheduleName":"manual-test","scheduledTime":"2026-03-13T10:00:00Z"}' \
--cli-binary-format raw-in-base64-out \
output.json
```
2. Check the DynamoDB table for the new execution record:
```
aws dynamodb scan --table-name <prefix>-agent-task-executions
```

## Cleanup

1. Destroy the stack:
```
terraform destroy --auto-approve
```
1. Confirm all resources have been removed:
```
terraform show
```
----
Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
145 changes: 145 additions & 0 deletions eventbridge-scheduler-ai-agent-trigger/action_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import boto3
import json
import os
import logging
from datetime import datetime, timezone

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["DYNAMODB_TABLE"])


def lambda_handler(event, context):
"""
Bedrock Agent Action Group Lambda.
Called by the agent to persist task execution records in DynamoDB.
"""
logger.info("Action group event: %s", json.dumps(event))

api_path = event.get("apiPath", "")
action_group = event.get("actionGroup", "")
http_method = event.get("httpMethod", "")
params = _extract_parameters(event)

logger.info("API path: %s | params: %s", api_path, json.dumps(params))

if api_path == "/record-task-execution":
result = _record_task_execution(params)
elif api_path == "/get-last-execution":
result = _get_last_execution(params)
else:
result = {
"statusCode": 400,
"body": json.dumps({"error": f"Unknown API path: {api_path}"}),
}

return {
"messageVersion": "1.0",
"response": {
"actionGroup": action_group,
"apiPath": api_path,
"httpMethod": http_method,
"httpStatusCode": result["statusCode"],
"responseBody": {
"application/json": {"body": result["body"]}
},
},
}


# ──────────────────────────────────────────
# Action handlers
# ──────────────────────────────────────────

def _record_task_execution(params: dict) -> dict:
"""Write an execution record to DynamoDB."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

item = {
"TaskId": params.get("taskId", f"task-{now}"),
"TaskType": params.get("taskType", "unknown"),
"ScheduledTime": params.get("scheduledTime", now),
"ExecutionSummary": params.get("executionSummary", ""),
"Status": "COMPLETED",
"RecordedAt": now,
}

table.put_item(Item=item)
logger.info("Recorded task execution: %s", item["TaskId"])

return {
"statusCode": 200,
"body": json.dumps(
{
"message": f"Task execution {item['TaskId']} recorded successfully",
"taskId": item["TaskId"],
"recordedAt": now,
}
),
}


def _get_last_execution(params: dict) -> dict:
"""Scan for the most recent execution (simple approach for demo)."""
task_type = params.get("taskType", "scheduled-report")

response = table.scan(
FilterExpression="TaskType = :tt",
ExpressionAttributeValues={":tt": task_type},
Limit=10,
)

items = sorted(
response.get("Items", []),
key=lambda x: x.get("RecordedAt", ""),
reverse=True,
)

if items:
last = items[0]
return {
"statusCode": 200,
"body": json.dumps(
{
"taskId": last["TaskId"],
"taskType": last["TaskType"],
"scheduledTime": last["ScheduledTime"],
"executionSummary": last.get("ExecutionSummary", ""),
"recordedAt": last["RecordedAt"],
}
),
}

return {
"statusCode": 404,
"body": json.dumps(
{"message": f"No executions found for task type: {task_type}"}
),
}


# ──────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────

def _extract_parameters(event: dict) -> dict:
"""Pull parameters from the Bedrock Agent request body and/or parameters list."""
params = {}

# From requestBody (POST actions)
properties = (
event.get("requestBody", {})
.get("content", {})
.get("application/json", {})
.get("properties", [])
)
for prop in properties:
params[prop["name"]] = prop.get("value", "")

# From top-level parameters (GET actions)
for param in event.get("parameters", []):
params[param["name"]] = param.get("value", "")

return params
106 changes: 106 additions & 0 deletions eventbridge-scheduler-ai-agent-trigger/api_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"openapi": "3.0.0",
"info": {
"title": "Scheduled Task Execution API",
"version": "1.0.0",
"description": "Actions for recording and retrieving scheduled AI agent task executions"
},
"paths": {
"/record-task-execution": {
"post": {
"operationId": "recordTaskExecution",
"summary": "Record a scheduled task execution in the tracking database",
"description": "Persists a task execution record with task ID, type, timestamp, and an AI-generated summary to DynamoDB",
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"taskId",
"taskType",
"scheduledTime",
"executionSummary"
],
"properties": {
"taskId": {
"type": "string",
"description": "Unique identifier for this task execution — combine scheduleName and scheduledTime"
},
"taskType": {
"type": "string",
"description": "The category of the scheduled task (e.g. scheduled-report)"
},
"scheduledTime": {
"type": "string",
"description": "ISO 8601 UTC timestamp when the task was scheduled to run"
},
"executionSummary": {
"type": "string",
"description": "AI-generated summary describing the task execution and its outcome"
}
}
}
}
}
},
"responses": {
"200": {
"description": "Execution recorded successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"message": { "type": "string" },
"taskId": { "type": "string" },
"recordedAt": { "type": "string" }
}
}
}
}
}
}
}
},
"/get-last-execution": {
"get": {
"operationId": "getLastExecution",
"summary": "Get the most recent task execution for a given task type",
"description": "Retrieves the latest execution record from DynamoDB filtered by task type",
"parameters": [
{
"name": "taskType",
"in": "query",
"required": true,
"schema": { "type": "string" },
"description": "Task type to look up (e.g. scheduled-report)"
}
],
"responses": {
"200": {
"description": "Last execution found",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"taskId": { "type": "string" },
"taskType": { "type": "string" },
"scheduledTime": { "type": "string" },
"executionSummary": { "type": "string" },
"recordedAt": { "type": "string" }
}
}
}
}
},
"404": {
"description": "No executions found for the given task type"
}
}
}
}
}
}
Loading