Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/api/v1/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'v1/indexed_resources'
require 'v1/indexer'
require 'v1/envelope_communities'
require 'v1/workflows'

module API
module V1
Expand Down Expand Up @@ -64,6 +65,8 @@ class Base < Grape::API
mount API::V1::Organizations
mount API::V1::Publishers
end

mount API::V1::Workflows
end
end
end
61 changes: 61 additions & 0 deletions app/api/v1/workflows.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'mountable_api'
require 'helpers/shared_helpers'

module API
module V1
# Endpoints for operational workflows (called by Argo Workflows)
class Workflows < MountableAPI
mounted do
helpers SharedHelpers

before do
authenticate!
end

resource :workflows do
desc 'Indexes all S3 JSON-LD graphs to Elasticsearch. ' \
'S3 is treated as the source of truth. ' \
'Called by Argo Workflows for orchestration.'
post 'index-all-s3-to-es' do
authorize :workflow, :trigger?

bucket_name = ENV['ENVELOPE_GRAPHS_BUCKET']
error!({ error: 'ENVELOPE_GRAPHS_BUCKET not configured' }, 500) unless bucket_name

es_address = ENV['ELASTICSEARCH_ADDRESS']
error!({ error: 'ELASTICSEARCH_ADDRESS not configured' }, 500) unless es_address

s3 = Aws::S3::Resource.new(region: ENV['AWS_REGION'].presence)
bucket = s3.bucket(bucket_name)

errors = {}
processed = 0
skipped = 0

bucket.objects.each do |object|
next unless object.key.end_with?('.json')

processed += 1

begin
IndexS3GraphToEs.call(object.key)
rescue StandardError => e
errors[object.key] = "#{e.class}: #{e.message}"
end
end

status_code = errors.empty? ? 200 : 207

status status_code
{
message: errors.empty? ? 'Indexing completed successfully' : 'Indexing completed with errors',
processed: processed,
errors_count: errors.size,
errors: errors.first(100).to_h
}
end
end
end
end
end
end
12 changes: 12 additions & 0 deletions app/policies/workflow_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require_relative 'application_policy'

# Specifies policies for workflow operations
class WorkflowPolicy < ApplicationPolicy
def trigger?
user.admin?
end

def show?
user.admin?
end
end
85 changes: 85 additions & 0 deletions app/services/index_s3_graph_to_es.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Indexes a JSON-LD graph from S3 directly to Elasticsearch
# Does not require database access - S3 is the source of truth
class IndexS3GraphToEs
attr_reader :s3_key, :community_name, :ctid

def initialize(s3_key)
@s3_key = s3_key
parse_s3_key
end

class << self
def call(s3_key)
new(s3_key).call
end
end

def call
return unless elasticsearch_address

client.index(
body: graph_json,
id: ctid,
index: community_name
)
rescue Elastic::Transport::Transport::Errors::BadRequest => e
raise e unless e.message.include?('Limit of total fields')

increase_total_fields_limit
retry
end

private

def parse_s3_key
# S3 key format: {community_name}/{ctid}.json
parts = s3_key.split('/')
@community_name = parts[0..-2].join('/')
@ctid = parts.last.sub(/\.json\z/i, '')
end

def graph_content
@graph_content ||= s3_object.get.body.read
end

def graph_json
@graph_json ||= JSON.parse(graph_content).to_json
end

def client
@client ||= Elasticsearch::Client.new(host: elasticsearch_address)
end

def elasticsearch_address
ENV['ELASTICSEARCH_ADDRESS'].presence
end

def s3_bucket
@s3_bucket ||= s3_resource.bucket(s3_bucket_name)
end

def s3_bucket_name
ENV['ENVELOPE_GRAPHS_BUCKET'].presence
end

def s3_object
@s3_object ||= s3_bucket.object(s3_key)
end

def s3_resource
@s3_resource ||= Aws::S3::Resource.new(region: ENV['AWS_REGION'].presence)
end

def increase_total_fields_limit
settings = client.indices.get_settings(index: community_name)

current_limit = settings
.dig(community_name, 'settings', 'index', 'mapping', 'total_fields', 'limit')
.to_i

client.indices.put_settings(
body: { 'index.mapping.total_fields.limit' => current_limit * 2 },
index: community_name
)
end
end
77 changes: 77 additions & 0 deletions lib/tasks/s3.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
namespace :s3 do
desc 'Index all S3 JSON-LD graphs to Elasticsearch (S3 as source of truth)'
task index_all_to_es: :environment do
require 'benchmark'
require 'json'

bucket_name = ENV['ENVELOPE_GRAPHS_BUCKET']
abort 'ENVELOPE_GRAPHS_BUCKET environment variable is not set' unless bucket_name

es_address = ENV['ELASTICSEARCH_ADDRESS']
abort 'ELASTICSEARCH_ADDRESS environment variable is not set' unless es_address

$stdout.sync = true

s3 = Aws::S3::Resource.new(region: ENV['AWS_REGION'].presence)
bucket = s3.bucket(bucket_name)

errors = {}
processed = 0
skipped = 0

puts "Starting S3 to ES indexing from bucket: #{bucket_name}"
puts "Elasticsearch address: #{es_address}"
puts "Counting objects..."

# Count total objects for progress reporting
total = bucket.objects.count { |obj| obj.key.end_with?('.json') }
puts "Found #{total} JSON files to index"
puts "Started at #{Time.now.utc}"

time = Benchmark.measure do
bucket.objects.each do |object|
next unless object.key.end_with?('.json')

processed += 1

begin
IndexS3GraphToEs.call(object.key)
rescue StandardError => e
errors[object.key] = "#{e.class}: #{e.message}"
end

# Progress every 100 records
if (processed % 100).zero?
puts "Progress: processed=#{processed}/#{total} errors=#{errors.size} skipped=#{skipped}"
end
end
end

puts time
puts "Finished at #{Time.now.utc} - processed=#{processed}, errors=#{errors.size}"

# Write errors to file
if errors.any?
File.write('/tmp/s3_index_errors.json', JSON.pretty_generate(errors))
puts "Wrote /tmp/s3_index_errors.json (#{errors.size} entries)"

# Upload errors to S3
begin
error_bucket = ENV['S3_ERRORS_BUCKET'] || bucket_name
error_key = "errors/s3-index-errors-#{Time.now.utc.strftime('%Y%m%dT%H%M%SZ')}.json"
s3_client = Aws::S3::Client.new(region: ENV['AWS_REGION'].presence)
s3_client.put_object(
bucket: error_bucket,
key: error_key,
body: File.open('/tmp/s3_index_errors.json', 'rb')
)
puts "Uploaded errors to s3://#{error_bucket}/#{error_key}"
rescue StandardError => e
warn "Failed to upload errors to S3: #{e.class}: #{e.message}"
end

warn "Encountered #{errors.size} errors. Sample: #{errors.to_a.first(5).to_h.inspect}"
exit 1
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Argo Workflows

These manifests install a minimal Argo Workflows control plane into the shared `credreg-staging` namespace. The controller and server components rely on a shared PostgreSQL database (for example, the RDS modules under `terraform/environments/eks`) for workflow persistence.

## Components
- `externalsecret.yaml` – syncs the AWS Secrets Manager entry `credreg-argo-workflows` into a Kubernetes Secret named `argo-postgres`.
- `configmap.yaml` – controller configuration that enables Postgres-based persistence; set the host/database here, while credentials come from the synced secret.
- `rbac.yaml` – service accounts plus the RBAC needed by the workflow controller and Argo server.
- `workflow-controller-deployment.yaml` – runs `workflow-controller` with the standard `argoexec` image.
- `argo-server.yaml` – exposes the Argo UI/API inside the cluster on port `2746`.
- `argo-basic-auth-externalsecret.yaml` – syncs the AWS Secrets Manager entry `credreg-argo-basic-auth` (or similar) to supply the base64-encoded `user:password` string for ingress auth.
- `argo-server-ingress.yaml` – optional HTTPS ingress + certificate (via cert-manager + Let's Encrypt) and basic auth for external access to the Argo UI.

## Before applying
1. **Provision or reference a PostgreSQL instance.** Ensure the desired environment has a reachable database endpoint.
2. **Create the Secrets Manager entry.** Create `credreg-argo-workflows` (or adjust the `remoteRef.key` value) with JSON keys `host`, `port`, `database`, `username`, `password`, `sslmode`. The External Secrets Operator will sync it into the cluster and the controller/server pick them up via env vars.
3. **Update `configmap.yaml`.** Set `persistence.postgresql.host` (and database/table names if they differ) for the target environment. Even though credentials are secret-backed, Argo still requires the host in this config.
4. **Install Argo CRDs.** Apply the upstream CRDs from https://github.com/argoproj/argo-workflows/releases (required only once per cluster) before rolling out these manifests.
5. **Configure DNS if using the ingress.** Update `argo-server-ingress.yaml` with the desired hostname(s) and point the DNS record at the ingress controller's load balancer.

## Apply order
```bash
kubectl apply -f terraform/environments/eks/k8s-manifests-staging/argo-workflow/externalsecret.yaml
kubectl apply -f terraform/environments/eks/k8s-manifests-staging/argo-workflow/rbac.yaml
kubectl apply -f terraform/environments/eks/k8s-manifests-staging/argo-workflow/configmap.yaml
kubectl apply -f terraform/environments/eks/k8s-manifests-staging/argo-workflow/workflow-controller-deployment.yaml
kubectl apply -f terraform/environments/eks/k8s-manifests-staging/argo-workflow/argo-server.yaml
# Optional ingress / certificate
kubectl apply -f terraform/environments/eks/k8s-manifests-staging/argo-workflow/argo-basic-auth-externalsecret.yaml
kubectl apply -f terraform/environments/eks/k8s-manifests-staging/argo-workflow/argo-server-ingress.yaml
```

Once the `argo-postgres` secret is synced and the controller connects to Postgres successfully, `kubectl get wf -n credreg-staging` should show persisted workflows even after pod restarts.

## Workflow Templates

### index-s3-to-es

Indexes all JSON-LD graphs from S3 directly to Elasticsearch. S3 is treated as the source of truth.

**Architecture:**
```
Argo Workflow (curl container)
├──1. POST to Keycloak /token (client credentials grant)
│ → Obtain fresh JWT
└──2. POST /workflows/index-all-s3-to-es
Registry API
├──▶ List S3 bucket objects
└──▶ For each .json file:
└──▶ Index to Elasticsearch
```

**Prerequisites - Keycloak Service Account:**

1. Create a Keycloak client in the `CE-Test` realm:
- **Client ID**: e.g., `argo-workflows`
- **Client authentication**: ON (confidential client)
- **Service accounts roles**: ON
- **Authentication flow**: Only "Service accounts roles" enabled

2. Assign the admin role to the service account:
- Go to the client → Service Account Roles
- Assign `ROLE_ADMINISTRATOR` from the `RegistryAPI` client

3. Get the client secret:
- Go to the client → Credentials
- Update the Client Secret


**Required configuration:**

1. **Keycloak Credentials Secret** (`argo-keycloak-credentials`):
- `client_id` – Keycloak client ID
- `client_secret` – Keycloak client secret

2. **Registry API environment variables** (already in app-configmap):
- `ENVELOPE_GRAPHS_BUCKET` – S3 bucket containing JSON-LD graphs
- `ELASTICSEARCH_ADDRESS` – Elasticsearch endpoint
- `AWS_REGION` – AWS region for S3 access

**Trigger the workflow:**

Via Argo CLI:
```bash
argo submit --from workflowtemplate/index-s3-to-es -n credreg-staging
```

Via Argo REST API:
```bash
kubectl port-forward -n credreg-staging svc/argo-server 2746:2746
BEARER=$(kubectl create token argo-server -n credreg-staging)

curl -sk https://localhost:2746/api/v1/workflows/credreg-staging \
-H "Authorization: Bearer $BEARER" \
-H 'Content-Type: application/json' \
-d '{
"workflow": {
"metadata": { "generateName": "index-s3-to-es-" },
"spec": { "workflowTemplateRef": { "name": "index-s3-to-es" } }
}
}'
```

Via Argo UI:
1. Navigate to the Argo UI
2. Go to Workflow Templates
3. Select `index-s3-to-es`
4. Click "Submit"

**Monitor workflow:**
```bash
# List workflows
kubectl get wf -n credreg-staging

# Watch workflow status
argo watch -n credreg-staging <workflow-name>

# View logs
argo logs -n credreg-staging <workflow-name>
```

**Workflow parameters:**

| Parameter | Default | Description |
|-----------|---------|-------------|
| `api-base-url` | `http://main-app.credreg-staging.svc.cluster.local:9292` | Registry API base URL |
| `keycloak-url` | `https://test-ce-kc-002.credentialengine.org/realms/CE-Test/protocol/openid-connect/token` | Keycloak token endpoint |

Override parameters when submitting:
```bash
argo submit --from workflowtemplate/index-s3-to-es \
-p api-base-url=http://custom-api:9292 \
-p keycloak-url=https://other-keycloak/realms/X/protocol/openid-connect/token \
-n credreg-staging
```
Loading
Loading