diff --git a/eventbridge-scheduler-ai-agent-trigger/Architecture.png b/eventbridge-scheduler-ai-agent-trigger/Architecture.png new file mode 100644 index 000000000..482111a97 Binary files /dev/null and b/eventbridge-scheduler-ai-agent-trigger/Architecture.png differ diff --git a/eventbridge-scheduler-ai-agent-trigger/README.md b/eventbridge-scheduler-ai-agent-trigger/README.md new file mode 100644 index 000000000..54e5e00ea --- /dev/null +++ b/eventbridge-scheduler-ai-agent-trigger/README.md @@ -0,0 +1,89 @@ +# Amazon EventBridge Scheduler to Amazon Bedrock AI Agent + +This pattern demonstrates how to trigger an Amazon Bedrock AI Agent on a recurring schedule using Amazon EventBridge Scheduler. An orchestrator AWS Lambda function, invoked by the scheduler, sends a task payload to the Bedrock Agent, which processes the input, generates an execution summary, and persists the result to a Amazon DynamoDB table via an action group Lambda. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/eventbridge-scheduler-ai-agent-trigger + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [Terraform](https://www.terraform.io/downloads.html) >= 1.0 installed + +## Architecture + +![Architecture Diagram](Architecture.png) + +The pattern deploys the following resources: + +1. **Amazon EventBridge Scheduler** – Triggers the orchestrator Lambda on a recurring schedule (default: `rate(1 hour)`). +2. **Orchestrator Lambda** (Python 3.14) – Receives the scheduler event and invokes the Bedrock Agent with a task payload. +3. **Amazon Bedrock Agent** – Processes the task payload, generates an execution summary using a foundation model (default: Claude 3 Haiku), and calls the action group. +4. **Action Group Lambda** (Python 3.14) – Persists execution records to DynamoDB. +5. **Amazon DynamoDB Table** – Stores task execution records. +6. **Amazon SQS Dead-Letter Queue** – Captures failed scheduler invocations after retries are exhausted. + +## Deployment Instructions + +1. Clone the repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd serverless-patterns/eventbridge-scheduler-ai-agent-trigger + ``` +1. Initialize Terraform: + ``` + terraform init + ``` +1. Deploy the infrastructure: + ``` + terraform apply -auto-approve + ``` + During the prompts, provide values for: + * `aws_region` – AWS region (e.g. `us-east-1`) + * `prefix` – Unique prefix for all resource names + +1. Note the outputs from the deployment. These contain the resource names and ARNs used for testing. + +## How it works + +1. EventBridge Scheduler fires on the configured schedule and invokes the orchestrator Lambda with a JSON payload containing `taskType`, `scheduleName`, and `scheduledTime`. +2. The orchestrator Lambda calls `bedrock-agent-runtime:InvokeAgent` with the payload, targeting the agent alias. +3. The Bedrock Agent parses the payload, generates an executive summary using the foundation model, and calls the `recordTaskExecution` action group. +4. The action group Lambda writes the execution record (task ID, type, scheduled time, summary, and recorded timestamp) to the DynamoDB table. +5. If the scheduler invocation fails after 3 retries, the event is sent to the SQS dead-letter queue. + +## Testing + +1. Replace `` with the prefix chosen during deployment and invoke the orchestrator Lambda function manually: + ``` + aws lambda invoke \ + --function-name -agent-orchestrator \ + --payload '{"taskType":"scheduled-report","scheduleName":"manual-test","scheduledTime":"2026-03-13T10:00:00Z"}' \ + --cli-binary-format raw-in-base64-out \ + output.json + ``` +2. Check the DynamoDB table for the new execution record: + ``` + aws dynamodb scan --table-name -agent-task-executions + ``` + +## Cleanup + +1. Destroy the stack: + ``` + terraform destroy --auto-approve + ``` +1. Confirm all resources have been removed: + ``` + terraform show + ``` +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/eventbridge-scheduler-ai-agent-trigger/action_group.py b/eventbridge-scheduler-ai-agent-trigger/action_group.py new file mode 100644 index 000000000..ad734974f --- /dev/null +++ b/eventbridge-scheduler-ai-agent-trigger/action_group.py @@ -0,0 +1,145 @@ +import boto3 +import json +import os +import logging +from datetime import datetime, timezone + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +dynamodb = boto3.resource("dynamodb") +table = dynamodb.Table(os.environ["DYNAMODB_TABLE"]) + + +def lambda_handler(event, context): + """ + Bedrock Agent Action Group Lambda. + Called by the agent to persist task execution records in DynamoDB. + """ + logger.info("Action group event: %s", json.dumps(event)) + + api_path = event.get("apiPath", "") + action_group = event.get("actionGroup", "") + http_method = event.get("httpMethod", "") + params = _extract_parameters(event) + + logger.info("API path: %s | params: %s", api_path, json.dumps(params)) + + if api_path == "/record-task-execution": + result = _record_task_execution(params) + elif api_path == "/get-last-execution": + result = _get_last_execution(params) + else: + result = { + "statusCode": 400, + "body": json.dumps({"error": f"Unknown API path: {api_path}"}), + } + + return { + "messageVersion": "1.0", + "response": { + "actionGroup": action_group, + "apiPath": api_path, + "httpMethod": http_method, + "httpStatusCode": result["statusCode"], + "responseBody": { + "application/json": {"body": result["body"]} + }, + }, + } + + +# ────────────────────────────────────────── +# Action handlers +# ────────────────────────────────────────── + +def _record_task_execution(params: dict) -> dict: + """Write an execution record to DynamoDB.""" + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + item = { + "TaskId": params.get("taskId", f"task-{now}"), + "TaskType": params.get("taskType", "unknown"), + "ScheduledTime": params.get("scheduledTime", now), + "ExecutionSummary": params.get("executionSummary", ""), + "Status": "COMPLETED", + "RecordedAt": now, + } + + table.put_item(Item=item) + logger.info("Recorded task execution: %s", item["TaskId"]) + + return { + "statusCode": 200, + "body": json.dumps( + { + "message": f"Task execution {item['TaskId']} recorded successfully", + "taskId": item["TaskId"], + "recordedAt": now, + } + ), + } + + +def _get_last_execution(params: dict) -> dict: + """Scan for the most recent execution (simple approach for demo).""" + task_type = params.get("taskType", "scheduled-report") + + response = table.scan( + FilterExpression="TaskType = :tt", + ExpressionAttributeValues={":tt": task_type}, + Limit=10, + ) + + items = sorted( + response.get("Items", []), + key=lambda x: x.get("RecordedAt", ""), + reverse=True, + ) + + if items: + last = items[0] + return { + "statusCode": 200, + "body": json.dumps( + { + "taskId": last["TaskId"], + "taskType": last["TaskType"], + "scheduledTime": last["ScheduledTime"], + "executionSummary": last.get("ExecutionSummary", ""), + "recordedAt": last["RecordedAt"], + } + ), + } + + return { + "statusCode": 404, + "body": json.dumps( + {"message": f"No executions found for task type: {task_type}"} + ), + } + + +# ────────────────────────────────────────── +# Helpers +# ────────────────────────────────────────── + +def _extract_parameters(event: dict) -> dict: + """Pull parameters from the Bedrock Agent request body and/or parameters list.""" + params = {} + + # From requestBody (POST actions) + properties = ( + event.get("requestBody", {}) + .get("content", {}) + .get("application/json", {}) + .get("properties", []) + ) + for prop in properties: + params[prop["name"]] = prop.get("value", "") + + # From top-level parameters (GET actions) + for param in event.get("parameters", []): + params[param["name"]] = param.get("value", "") + + return params \ No newline at end of file diff --git a/eventbridge-scheduler-ai-agent-trigger/api_schema.json b/eventbridge-scheduler-ai-agent-trigger/api_schema.json new file mode 100644 index 000000000..6eef8a289 --- /dev/null +++ b/eventbridge-scheduler-ai-agent-trigger/api_schema.json @@ -0,0 +1,106 @@ +{ + "openapi": "3.0.0", + "info": { + "title": "Scheduled Task Execution API", + "version": "1.0.0", + "description": "Actions for recording and retrieving scheduled AI agent task executions" + }, + "paths": { + "/record-task-execution": { + "post": { + "operationId": "recordTaskExecution", + "summary": "Record a scheduled task execution in the tracking database", + "description": "Persists a task execution record with task ID, type, timestamp, and an AI-generated summary to DynamoDB", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "taskId", + "taskType", + "scheduledTime", + "executionSummary" + ], + "properties": { + "taskId": { + "type": "string", + "description": "Unique identifier for this task execution — combine scheduleName and scheduledTime" + }, + "taskType": { + "type": "string", + "description": "The category of the scheduled task (e.g. scheduled-report)" + }, + "scheduledTime": { + "type": "string", + "description": "ISO 8601 UTC timestamp when the task was scheduled to run" + }, + "executionSummary": { + "type": "string", + "description": "AI-generated summary describing the task execution and its outcome" + } + } + } + } + } + }, + "responses": { + "200": { + "description": "Execution recorded successfully", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "message": { "type": "string" }, + "taskId": { "type": "string" }, + "recordedAt": { "type": "string" } + } + } + } + } + } + } + } + }, + "/get-last-execution": { + "get": { + "operationId": "getLastExecution", + "summary": "Get the most recent task execution for a given task type", + "description": "Retrieves the latest execution record from DynamoDB filtered by task type", + "parameters": [ + { + "name": "taskType", + "in": "query", + "required": true, + "schema": { "type": "string" }, + "description": "Task type to look up (e.g. scheduled-report)" + } + ], + "responses": { + "200": { + "description": "Last execution found", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "taskId": { "type": "string" }, + "taskType": { "type": "string" }, + "scheduledTime": { "type": "string" }, + "executionSummary": { "type": "string" }, + "recordedAt": { "type": "string" } + } + } + } + } + }, + "404": { + "description": "No executions found for the given task type" + } + } + } + } + } +} \ No newline at end of file diff --git a/eventbridge-scheduler-ai-agent-trigger/example-pattern.json b/eventbridge-scheduler-ai-agent-trigger/example-pattern.json new file mode 100644 index 000000000..0278ecb49 --- /dev/null +++ b/eventbridge-scheduler-ai-agent-trigger/example-pattern.json @@ -0,0 +1,58 @@ +{ + "title": "Trigger AI Agent with Amazon EventBridge Scheduler", + "description": "Create a EventBridge scheduler which invokes a Bedrock Agent upon triggering.", + "language": "Python", + "level": "300", + "framework": "Terraform", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to trigger an Amazon Bedrock AI Agent on a recurring schedule using Amazon EventBridge Scheduler. An orchestrator AWS Lambda function, invoked by the scheduler, sends a task payload to the Bedrock Agent, which processes the input, generates an execution summary, and persists the result to a DynamoDB table via an action group Lambda. The pattern includes retry logic, a dead-letter queue for failed invocations, and least-privilege IAM policies scoped to the agent alias ARN." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/eventbridge-scheduler-ai-agent-trigger", + "templateURL": "serverless-patterns/eventbridge-scheduler-ai-agent-trigger", + "projectFolder": "eventbridge-scheduler-ai-agent-trigger", + "templateFile": "main.tf" + } + }, + "resources": { + "bullets": [ + { + "text": "Invoke a Lambda function on a schedule", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-eventbridge-scheduler.html" + }, + { + "text": "Allow users to view information about and invoke an agent", + "link": "https://docs.aws.amazon.com/bedrock/latest/userguide/security_iam_id-based-policy-examples-agent.html#security_iam_id-based-policy-examples-perform-actions-agent" + } + ] + }, + "deploy": { + "text": [ + "terraform init", + "terraform apply" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "terraform destroy", + "terraform show" + ] + }, + "authors": [ + { + "name": "Rajil Paloth", + "image": "https://i.ibb.co/r2TsqGf6/Passport-size.jpg", + "bio": "ProServe Delivery Consultant at AWS", + "linkedin": "paloth" + } + ] +} diff --git a/eventbridge-scheduler-ai-agent-trigger/main.tf b/eventbridge-scheduler-ai-agent-trigger/main.tf new file mode 100644 index 000000000..731337070 --- /dev/null +++ b/eventbridge-scheduler-ai-agent-trigger/main.tf @@ -0,0 +1,533 @@ +terraform { + required_version = ">= 1.0" + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 6.32.1" + } + archive = { + source = "hashicorp/archive" + version = "~> 2.7" + } + } +} + +provider "aws" { + region = var.aws_region +} + +############################################################ +# Variables +############################################################ + +variable "aws_region" { + description = "AWS region for resources (e.g. us-east-1, us-west-2)" + type = string + + validation { + condition = can(regex("^[a-z]{2}-[a-z]+-[0-9]+$", var.aws_region)) + error_message = "Must be a valid AWS region (e.g. us-east-1, eu-west-2)." + } +} + +variable "prefix" { + description = "Unique prefix for all resource names" + type = string + + validation { + condition = can(regex("^[a-z0-9][a-z0-9\\-]{1,20}$", var.prefix)) + error_message = "Prefix must be 2-21 lowercase alphanumeric characters or hyphens." + } +} + +variable "bedrock_model_id" { + description = "Bedrock foundation model ID for the agent" + type = string + default = "anthropic.claude-3-haiku-20240307-v1:0" +} + +variable "schedule_expression" { + description = "EventBridge Scheduler expression (e.g. rate(1 hour), cron(0 9 * * ? *))" + type = string + default = "rate(1 hour)" +} + +variable "log_retention_days" { + description = "CloudWatch log retention in days (0 = never expire)" + type = number + default = 14 +} + +############################################################ +# Data Sources +############################################################ + +data "aws_caller_identity" "current" {} +data "aws_region" "current" {} + +############################################################ +# 1. DYNAMODB TABLE (agent writes execution records here) +############################################################ + +resource "aws_dynamodb_table" "task_executions" { + name = "${var.prefix}-agent-task-executions" + billing_mode = "PAY_PER_REQUEST" + hash_key = "TaskId" + + attribute { + name = "TaskId" + type = "S" + } + + tags = { + Project = "${var.prefix}-ai-agent-scheduler" + } +} + +############################################################ +# 2. ACTION GROUP LAMBDA (Bedrock Agent calls this to +# read/write DynamoDB) +############################################################ + +resource "aws_iam_role" "action_group_role" { + name = "${var.prefix}-action-group-lambda-role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Action = "sts:AssumeRole" + Effect = "Allow" + Principal = { Service = "lambda.amazonaws.com" } + }] + }) +} + +resource "aws_iam_role_policy_attachment" "action_group_basic" { + role = aws_iam_role.action_group_role.name + policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" +} + +resource "aws_iam_role_policy" "action_group_dynamodb" { + name = "${var.prefix}-action-group-dynamodb" + role = aws_iam_role.action_group_role.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Effect = "Allow" + Action = [ + "dynamodb:PutItem", + "dynamodb:GetItem", + "dynamodb:UpdateItem", + "dynamodb:Query", + "dynamodb:Scan" + ] + Resource = [ + aws_dynamodb_table.task_executions.arn, + "${aws_dynamodb_table.task_executions.arn}/index/*" + ] + }] + }) +} + +resource "aws_cloudwatch_log_group" "action_group_logs" { + name = "/aws/lambda/${var.prefix}-agent-action-group" + retention_in_days = var.log_retention_days +} + +data "archive_file" "action_group" { + type = "zip" + output_path = "${path.module}/action_group.zip" + + source { + filename = "action_group.py" + content = file("${path.module}/action_group.py") + } +} + +resource "aws_lambda_function" "action_group" { + function_name = "${var.prefix}-agent-action-group" + role = aws_iam_role.action_group_role.arn + handler = "action_group.lambda_handler" + runtime = "python3.14" + timeout = 30 + memory_size = 128 + filename = data.archive_file.action_group.output_path + source_code_hash = data.archive_file.action_group.output_base64sha256 + + environment { + variables = { + DYNAMODB_TABLE = aws_dynamodb_table.task_executions.name + PREFIX = var.prefix + } + } + + depends_on = [ + aws_cloudwatch_log_group.action_group_logs, + aws_iam_role_policy_attachment.action_group_basic, + aws_iam_role_policy.action_group_dynamodb, + ] +} + +############################################################ +# 3. BEDROCK AGENT (AI agent with action group) +############################################################ + +# --- Agent IAM Role --- + +resource "aws_iam_role" "bedrock_agent_role" { + name = "${var.prefix}-bedrock-agent-role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Action = "sts:AssumeRole" + Effect = "Allow" + Principal = { Service = "bedrock.amazonaws.com" } + Condition = { + StringEquals = { + "aws:SourceAccount" = data.aws_caller_identity.current.account_id + } + } + }] + }) +} + +resource "aws_iam_role_policy" "bedrock_agent_model" { + name = "${var.prefix}-bedrock-invoke-model" + role = aws_iam_role.bedrock_agent_role.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Effect = "Allow" + Action = "bedrock:InvokeModel" + Resource = "arn:aws:bedrock:${var.aws_region}::foundation-model/${var.bedrock_model_id}" + }] + }) +} + +# --- Agent --- + +resource "aws_bedrockagent_agent" "task_agent" { + agent_name = "${var.prefix}-scheduled-task-agent" + agent_resource_role_arn = aws_iam_role.bedrock_agent_role.arn + foundation_model = var.bedrock_model_id + idle_session_ttl_in_seconds = 600 + prepare_agent = false # prepared after action group is attached + + instruction = <<-EOT + You are a Scheduled Task Execution Agent. You run on a schedule via + Amazon EventBridge Scheduler. Each time you are invoked you must: + + 1. PARSE the task payload provided to you (JSON with taskType, scheduleName, + and scheduledTime). + 2. GENERATE a brief, one-paragraph executive summary describing the task + execution — include the task type, timestamp, and a note that the + execution completed successfully. + 3. CALL the recordTaskExecution action to persist the execution record. + Use the values from the payload for taskId (use scheduleName + timestamp + combined), taskType, scheduledTime, and pass your generated summary as + the executionSummary. + 4. CONFIRM success by returning a short completion message. + + Always use the exact values from the payload. Never fabricate timestamps + or identifiers. + EOT +} + +# --- Action Group --- + +resource "aws_bedrockagent_agent_action_group" "task_actions" { + action_group_name = "${var.prefix}-task-execution-actions" + agent_id = aws_bedrockagent_agent.task_agent.agent_id + agent_version = "DRAFT" + skip_resource_in_use_check = true + + action_group_executor { + lambda = aws_lambda_function.action_group.arn + } + + api_schema { + payload = file("${path.module}/api_schema.json") + } +} + +# --- Allow Bedrock to invoke Action Group Lambda --- + +resource "aws_lambda_permission" "allow_bedrock" { + statement_id = "AllowBedrockAgentInvoke" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.action_group.function_name + principal = "bedrock.amazonaws.com" + source_arn = aws_bedrockagent_agent.task_agent.agent_arn +} + +# --- Prepare Agent (must happen after action group is attached) --- + +resource "terraform_data" "prepare_agent" { + depends_on = [ + aws_bedrockagent_agent_action_group.task_actions, + aws_lambda_permission.allow_bedrock, + ] + + triggers_replace = [ + aws_bedrockagent_agent.task_agent.agent_id, + aws_bedrockagent_agent_action_group.task_actions.action_group_name, + ] + + provisioner "local-exec" { + command = "aws bedrock-agent prepare-agent --agent-id ${aws_bedrockagent_agent.task_agent.agent_id} --region ${var.aws_region} && sleep 15" + } +} + +# --- Agent Alias (points to the prepared version) --- + +resource "aws_bedrockagent_agent_alias" "live" { + agent_alias_name = "${var.prefix}-live" + agent_id = aws_bedrockagent_agent.task_agent.agent_id + description = "Live alias for scheduled task agent" + + depends_on = [terraform_data.prepare_agent] +} + +############################################################ +# 4. ORCHESTRATOR LAMBDA (Scheduler invokes this → +# it invokes the Bedrock Agent) +############################################################ + +resource "aws_iam_role" "orchestrator_role" { + name = "${var.prefix}-orchestrator-lambda-role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Action = "sts:AssumeRole" + Effect = "Allow" + Principal = { Service = "lambda.amazonaws.com" } + }] + }) +} + +resource "aws_iam_role_policy_attachment" "orchestrator_basic" { + role = aws_iam_role.orchestrator_role.name + policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" +} + +# ── InvokeAgent requires permission on the agent ALIAS ARN ── +# ARN format: arn:aws:bedrock:{region}:{account}:agent-alias/{agent-id}/{alias-id} +# The alias resource exposes this as agent_alias_arn. +# We also grant access to all aliases of this agent so the role +# continues to work if a new alias is created later. + +resource "aws_iam_role_policy" "orchestrator_bedrock" { + name = "${var.prefix}-orchestrator-invoke-agent" + role = aws_iam_role.orchestrator_role.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Sid = "InvokeBedrockAgentAlias" + Effect = "Allow" + Action = "bedrock:InvokeAgent" + Resource = [ + # Specific alias created by this stack + aws_bedrockagent_agent_alias.live.agent_alias_arn, + # Wildcard for any future alias of the same agent + "arn:aws:bedrock:${var.aws_region}:${data.aws_caller_identity.current.account_id}:agent-alias/${aws_bedrockagent_agent.task_agent.agent_id}/*" + ] + }] + }) +} + +resource "aws_cloudwatch_log_group" "orchestrator_logs" { + name = "/aws/lambda/${var.prefix}-agent-orchestrator" + retention_in_days = var.log_retention_days +} + +data "archive_file" "orchestrator" { + type = "zip" + output_path = "${path.module}/orchestrator.zip" + + source { + filename = "orchestrator.py" + content = file("${path.module}/orchestrator.py") + } +} + +resource "aws_lambda_function" "orchestrator" { + function_name = "${var.prefix}-agent-orchestrator" + role = aws_iam_role.orchestrator_role.arn + handler = "orchestrator.lambda_handler" + runtime = "python3.14" + timeout = 120 + memory_size = 256 + filename = data.archive_file.orchestrator.output_path + source_code_hash = data.archive_file.orchestrator.output_base64sha256 + + environment { + variables = { + BEDROCK_AGENT_ID = aws_bedrockagent_agent.task_agent.agent_id + BEDROCK_AGENT_ALIAS_ID = aws_bedrockagent_agent_alias.live.agent_alias_id + PREFIX = var.prefix + } + } + + depends_on = [ + aws_cloudwatch_log_group.orchestrator_logs, + aws_iam_role_policy_attachment.orchestrator_basic, + aws_iam_role_policy.orchestrator_bedrock, + ] +} + +############################################################ +# 5. SQS DEAD-LETTER QUEUE (captures failed scheduler +# invocations after retries are exhausted) +############################################################ + +resource "aws_sqs_queue" "scheduler_dlq" { + name = "${var.prefix}-scheduler-dlq" + message_retention_seconds = 1209600 # 14 days +} + +############################################################ +# 6. EVENTBRIDGE SCHEDULER (triggers the orchestrator +# on a recurring schedule) +############################################################ + +resource "aws_iam_role" "scheduler_role" { + name = "${var.prefix}-scheduler-role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Action = "sts:AssumeRole" + Effect = "Allow" + Principal = { Service = "scheduler.amazonaws.com" } + }] + }) +} + +resource "aws_iam_role_policy" "scheduler_invoke_lambda" { + name = "${var.prefix}-scheduler-invoke-lambda" + role = aws_iam_role.scheduler_role.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Sid = "InvokeLambda" + Effect = "Allow" + Action = "lambda:InvokeFunction" + Resource = aws_lambda_function.orchestrator.arn + }, + { + Sid = "SendToDLQ" + Effect = "Allow" + Action = "sqs:SendMessage" + Resource = aws_sqs_queue.scheduler_dlq.arn + } + ] + }) +} + +resource "aws_sqs_queue_policy" "allow_scheduler" { + queue_url = aws_sqs_queue.scheduler_dlq.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Sid = "AllowSchedulerSendMessage" + Effect = "Allow" + Principal = { Service = "scheduler.amazonaws.com" } + Action = "sqs:SendMessage" + Resource = aws_sqs_queue.scheduler_dlq.arn + Condition = { + ArnEquals = { + "aws:SourceArn" = "arn:aws:scheduler:${var.aws_region}:${data.aws_caller_identity.current.account_id}:schedule/default/${var.prefix}-trigger-ai-agent" + } + } + }] + }) +} + +resource "aws_cloudwatch_log_group" "scheduler_logs" { + name = "/aws/scheduler/${var.prefix}-trigger-ai-agent" + retention_in_days = var.log_retention_days +} + +resource "aws_scheduler_schedule" "trigger_agent" { + name = "${var.prefix}-trigger-ai-agent" + schedule_expression = var.schedule_expression + + flexible_time_window { + mode = "OFF" + } + + target { + arn = aws_lambda_function.orchestrator.arn + role_arn = aws_iam_role.scheduler_role.arn + + input = jsonencode({ + taskType = "scheduled-report" + scheduleName = "${var.prefix}-trigger-ai-agent" + scheduledTime = "REPLACED_AT_RUNTIME" + }) + + retry_policy { + maximum_retry_attempts = 3 + maximum_event_age_in_seconds = 3600 + } + + dead_letter_config { + arn = aws_sqs_queue.scheduler_dlq.arn + } + } + + depends_on = [aws_cloudwatch_log_group.scheduler_logs] +} + +############################################################ +# 7. OUTPUTS +############################################################ + +output "prefix" { + value = var.prefix +} + +output "schedule_name" { + value = aws_scheduler_schedule.trigger_agent.name +} + +output "schedule_arn" { + value = aws_scheduler_schedule.trigger_agent.arn +} + +output "orchestrator_lambda_name" { + value = aws_lambda_function.orchestrator.function_name +} + +output "action_group_lambda_name" { + value = aws_lambda_function.action_group.function_name +} + +output "bedrock_agent_id" { + value = aws_bedrockagent_agent.task_agent.agent_id +} + +output "bedrock_agent_alias_id" { + value = aws_bedrockagent_agent_alias.live.agent_alias_id +} + +output "bedrock_agent_alias_arn" { + value = aws_bedrockagent_agent_alias.live.agent_alias_arn + description = "The alias ARN that the orchestrator Lambda is authorized to invoke" +} + +output "dynamodb_table_name" { + value = aws_dynamodb_table.task_executions.name +} + +output "dlq_queue_url" { + value = aws_sqs_queue.scheduler_dlq.url +} \ No newline at end of file diff --git a/eventbridge-scheduler-ai-agent-trigger/orchestrator.py b/eventbridge-scheduler-ai-agent-trigger/orchestrator.py new file mode 100644 index 000000000..3bc6577c1 --- /dev/null +++ b/eventbridge-scheduler-ai-agent-trigger/orchestrator.py @@ -0,0 +1,72 @@ +import boto3 +import json +import os +import uuid +import logging +from datetime import datetime, timezone + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +bedrock_agent_runtime = boto3.client("bedrock-agent-runtime") + +AGENT_ID = os.environ["BEDROCK_AGENT_ID"] +AGENT_ALIAS_ID = os.environ["BEDROCK_AGENT_ALIAS_ID"] + + +def lambda_handler(event, context): + """ + Orchestrator Lambda — invoked by EventBridge Scheduler. + Parses the schedule payload, enriches it with a real timestamp, + and sends it to the Bedrock Agent for processing. + """ + logger.info("Received event: %s", json.dumps(event)) + + # ── Enrich the payload with the actual invocation time ── + task_type = event.get("taskType", "scheduled-report") + schedule_name = event.get("scheduleName", "unknown-schedule") + scheduled_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + prompt = ( + f"Process this scheduled task execution:\n\n" + f'{{"taskType": "{task_type}", ' + f'"scheduleName": "{schedule_name}", ' + f'"scheduledTime": "{scheduled_time}"}}\n\n' + f"Parse the payload, generate a summary, and record the execution." + ) + + # ── Invoke the Bedrock Agent ── + session_id = str(uuid.uuid4()) + logger.info( + "Invoking agent %s (alias %s) | session %s", + AGENT_ID, + AGENT_ALIAS_ID, + session_id, + ) + + response = bedrock_agent_runtime.invoke_agent( + agentId=AGENT_ID, + agentAliasId=AGENT_ALIAS_ID, + sessionId=session_id, + inputText=prompt, + ) + + # ── Collect the streaming response ── + agent_response = "" + for stream_event in response.get("completion", []): + if "chunk" in stream_event: + agent_response += stream_event["chunk"]["bytes"].decode("utf-8") + + logger.info("Agent response: %s", agent_response) + + return { + "statusCode": 200, + "body": json.dumps( + { + "sessionId": session_id, + "taskType": task_type, + "scheduledTime": scheduled_time, + "agentResponse": agent_response, + } + ), + } \ No newline at end of file