From 3a48660e4ac14f504211f77a1ca01c2228ec543b Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Tue, 25 Nov 2025 10:32:50 -0700 Subject: [PATCH 1/8] error_passing --- lib/zapt/tasks/shell_task.rb | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/zapt/tasks/shell_task.rb b/lib/zapt/tasks/shell_task.rb index 82b1fdf..cbb0736 100644 --- a/lib/zapt/tasks/shell_task.rb +++ b/lib/zapt/tasks/shell_task.rb @@ -24,7 +24,19 @@ def commands cmds, working_dir:nil, user:nil, host:nil, pem:nil, quiet:false, ig def command cmd, working_dir:nil, user:nil, host:nil, pem:nil, quiet:false, ignore_failure:false, dryrun:false, capture:false rval, status = run_cmd cmd, working_dir, user, host, pem, quiet, dryrun, ignore_failure, capture - raise Error.new "Command bad exit status #{cmd}" unless status or ignore_failure + unless status or ignore_failure + # Include command output in error message for better debugging + error_msg = "Command failed: #{cmd}" + if rval && !rval.strip.empty? + # Truncate very long output but preserve the end which usually has the error + output = rval.strip + if output.length > 2000 + output = "...(truncated)...\n" + output[-2000..-1] + end + error_msg += "\n\nOutput:\n#{output}" + end + raise Error.new error_msg + end return rval, status end From 29c2104e6611c5ecf8ba534148d848ceafe08878 Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Tue, 25 Nov 2025 10:55:36 -0700 Subject: [PATCH 2/8] error_passing2 --- lib/zapt/system.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/zapt/system.rb b/lib/zapt/system.rb index 8f7c725..b59b252 100644 --- a/lib/zapt/system.rb +++ b/lib/zapt/system.rb @@ -22,7 +22,8 @@ def system cmd, user=nil, host=nil, pem=nil, quiet=false, ignore_failure=false, $logger.info "Running command: #{cmd}" unless quiet # alternatively use Open3.capture3 (this caused hangs but maybe -tt fixed the issue?) if USE_BACKTICKS - rval = `#{cmd}`; exit_status=($?.exitstatus == 0) + # Capture both stdout and stderr (2>&1) so errors are included in output + rval = `#{cmd} 2>&1`; exit_status=($?.exitstatus == 0) if(exit_status) $logger.info(rval) unless quiet; else From f2cdd9aee8a774da9379da4894fc79187a152a32 Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Tue, 25 Nov 2025 11:13:58 -0700 Subject: [PATCH 3/8] new cluster def path --- lib/zapt/cluster_def_cf.rb | 363 +++++++++++++++++++++++++++++++++++ lib/zapt/commands/runtask.rb | 62 +++++- lib/zapt/version.rb | 2 +- 3 files changed, 423 insertions(+), 4 deletions(-) create mode 100644 lib/zapt/cluster_def_cf.rb diff --git a/lib/zapt/cluster_def_cf.rb b/lib/zapt/cluster_def_cf.rb new file mode 100644 index 0000000..74c31e1 --- /dev/null +++ b/lib/zapt/cluster_def_cf.rb @@ -0,0 +1,363 @@ +# CloudFormation-based cluster definition loader for zapt +# This module provides functions to load cluster definitions from CloudFormation +# stack outputs and EC2 instance tags instead of YAML files. +# +# Usage: +# Set CLUSTER_DEF_SOURCE=cloudformation to use CF-based loading +# Or call Zapt::ClusterDefCF.load_from_stack(stack_name) directly +# +require 'json' + +module Zapt + module ClusterDefCF + # Production RTP channel hosts configuration + # These are static since production comms servers are fixed infrastructure + PROD_RTP_CHANNEL_HOSTS = { + A: [ + 'https://comms0.wootmath.com', + 'https://comms1.wootmath.com', + 'https://comms2.wootmath.com' + ], + B: [ + 'https://comms3.wootmath.com', + 'https://comms4.wootmath.com', + 'https://comms5.wootmath.com' + ] + }.freeze + + class << self + # Check if we should use CloudFormation-based loading + def use_cloudformation? + ENV['CLUSTER_DEF_SOURCE'] == 'cloudformation' + end + + # Get the current instance's metadata using IMDSv2 + def get_instance_metadata(path) + # Get token for IMDSv2 + token = `curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" 2>/dev/null`.strip + return nil if token.empty? + + # Use token to get metadata + result = `curl -s -H "X-aws-ec2-metadata-token: #{token}" "http://169.254.169.254/latest/meta-data/#{path}" 2>/dev/null`.strip + result.empty? ? nil : result + end + + # Get the current instance ID from EC2 metadata + def get_current_instance_id + get_instance_metadata('instance-id') + end + + # Get the current region from EC2 metadata + def get_current_region + az = get_instance_metadata('placement/availability-zone') + az ? az.chop : nil # Remove last char (zone letter) to get region + end + + # Get instance tags using AWS CLI + def get_instance_tags(instance_id, region = nil) + region ||= get_current_region + cmd = "aws ec2 describe-tags --filters \"Name=resource-id,Values=#{instance_id}\" --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + return {} if result.empty? + + tags = {} + JSON.parse(result)['Tags'].each do |tag| + tags[tag['Key']] = tag['Value'] + end + tags + rescue JSON::ParserError + {} + end + + # Get CloudFormation stack outputs + def get_stack_outputs(stack_name, region = nil) + region ||= get_current_region || 'us-west-2' + cmd = "aws cloudformation describe-stacks --stack-name #{stack_name} --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + return {} if result.empty? + + outputs = {} + stack = JSON.parse(result)['Stacks']&.first + return {} unless stack + + (stack['Outputs'] || []).each do |output| + outputs[output['OutputKey']] = output['OutputValue'] + end + outputs + rescue JSON::ParserError + {} + end + + # Get CloudFormation stack parameters + def get_stack_parameters(stack_name, region = nil) + region ||= get_current_region || 'us-west-2' + cmd = "aws cloudformation describe-stacks --stack-name #{stack_name} --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + return {} if result.empty? + + params = {} + stack = JSON.parse(result)['Stacks']&.first + return {} unless stack + + (stack['Parameters'] || []).each do |param| + params[param['ParameterKey']] = param['ParameterValue'] + end + params + rescue JSON::ParserError + {} + end + + # Try to find associated routing stack for an instance stack + # Convention: instance stack "foo-box" -> routing stack "foo-box-routing" or "foo-routing" + def find_routing_stack(instance_stack_name, region = nil) + region ||= get_current_region || 'us-west-2' + + # Try common naming patterns + candidates = [ + "#{instance_stack_name}-routing", # snapper-box -> snapper-box-routing + instance_stack_name.sub(/-box$/, '-routing') # snapper-box -> snapper-routing + ].uniq + + candidates.each do |candidate| + outputs = get_stack_outputs(candidate, region) + return candidate if outputs.any? + end + + nil + end + + # Get routing info from routing stack (frontend_host, rtp_channel_hosts) + def get_routing_info(routing_stack_name, region = nil) + return {} unless routing_stack_name + + outputs = get_stack_outputs(routing_stack_name, region) + params = get_stack_parameters(routing_stack_name, region) + + info = {} + + # DNSName output is the frontend_host + if outputs['DNSName'] + info[:frontend_host] = "https://#{outputs['DNSName']}" + end + + # Build rtp_channel_hosts for dev VMs: + # - Channel A: shared dev comms server (comms-dev1.wootmath.com) + # - Channel B: VM-specific comms if CreateComms=true, otherwise same as A + shared_comms = "https://comms-dev1.wootmath.com/" + + if params['CreateComms'] == 'true' && params['Identifier'] && params['PrivateHostedZoneName'] + vm_comms = "https://comm-#{params['Identifier']}.#{params['PrivateHostedZoneName']}" + info[:rtp_channel_hosts] = { A: [shared_comms], B: [vm_comms] } + else + # No VM-specific comms, use shared for both channels + info[:rtp_channel_hosts] = { A: [shared_comms], B: [shared_comms] } + end + + info + end + + # Get all EC2 instances in a CloudFormation stack + def get_stack_instances(stack_name, region = nil) + region ||= get_current_region || 'us-west-2' + cmd = "aws ec2 describe-instances --filters \"Name=tag:aws:cloudformation:stack-name,Values=#{stack_name}\" \"Name=instance-state-name,Values=running\" --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + return [] if result.empty? + + instances = [] + JSON.parse(result)['Reservations'].each do |reservation| + reservation['Instances'].each do |instance| + instances << instance + end + end + instances + rescue JSON::ParserError + [] + end + + # Get all EC2 instances in an AutoScaling Group + def get_asg_instances(asg_name, region = nil) + region ||= get_current_region || 'us-west-2' + + # First get instance IDs from the ASG + asg_cmd = "aws autoscaling describe-auto-scaling-groups --auto-scaling-group-names #{asg_name} --region #{region} --output json 2>/dev/null" + asg_result = `#{asg_cmd}` + return [] if asg_result.empty? + + asg_data = JSON.parse(asg_result) + asg = asg_data['AutoScalingGroups']&.first + return [] unless asg + + instance_ids = asg['Instances']&.select { |i| i['LifecycleState'] == 'InService' }&.map { |i| i['InstanceId'] } + return [] if instance_ids.nil? || instance_ids.empty? + + # Then get full instance details + ids_str = instance_ids.join(' ') + ec2_cmd = "aws ec2 describe-instances --instance-ids #{ids_str} --region #{region} --output json 2>/dev/null" + ec2_result = `#{ec2_cmd}` + return [] if ec2_result.empty? + + instances = [] + JSON.parse(ec2_result)['Reservations'].each do |reservation| + reservation['Instances'].each do |instance| + instances << instance if instance['State']['Name'] == 'running' + end + end + instances + rescue JSON::ParserError + [] + end + + # Build node configuration from EC2 instance data + def build_node_from_instance(instance, default_user = 'ubuntu') + tags = {} + (instance['Tags'] || []).each { |t| tags[t['Key']] = t['Value'] } + + { + internal_ip: instance['PrivateIpAddress'], + public_ip: instance['PublicIpAddress'] || instance['PrivateIpAddress'], + user: tags['cluster-default-user'] || default_user, + id: instance['InstanceId'], + services: { + frontend: { + redirect_to_https: true + } + } + } + end + + # Load cluster definition from CloudFormation stack outputs + # This is the main function that provides the same interface as load_named_cluster_def + # It combines data from the instance stack and the associated routing stack (for dev boxes) + # or uses ASG discovery (for production clusters) + def load_from_stack(stack_name, region = nil) + region ||= get_current_region || 'us-west-2' + + $logger.info "Loading cluster definition from CloudFormation stack: #{stack_name}" if $logger + + # Get stack outputs + outputs = get_stack_outputs(stack_name, region) + if outputs.empty? + raise Zapt::Error.new("CloudFormation stack '#{stack_name}' not found or has no outputs") + end + + # Get instances - try ASG first (for production clusters), then CF stack instances (for dev boxes) + instances = [] + asg_name = outputs['AutoScalingGroupName'] + if asg_name + $logger.info "Found ASG: #{asg_name}, discovering instances..." if $logger + instances = get_asg_instances(asg_name, region) + end + + # Fall back to CF stack instances if no ASG or no ASG instances found + if instances.empty? + instances = get_stack_instances(stack_name, region) + end + + if instances.empty? + raise Zapt::Error.new("No running instances found in stack '#{stack_name}'") + end + + # Try to find associated routing stack for frontend_host and rtp_channel_hosts (dev boxes) + routing_stack = find_routing_stack(stack_name, region) + routing_info = {} + if routing_stack + routing_info = get_routing_info(routing_stack, region) + $logger.info "Found routing stack: #{routing_stack}" if $logger + end + + # Get first instance for default values + first_instance = instances.first + first_tags = {} + (first_instance['Tags'] || []).each { |t| first_tags[t['Key']] = t['Value'] } + + # Determine environment + env = outputs['ClusterEnv'] || first_tags['env'] || 'development' + is_production = (env == 'production') + + # Build cluster definition hash matching YAML structure + # Priority: routing stack > stack outputs > instance tags > defaults + cluster_def = { + name: outputs['ClusterName'] || stack_name, + region: outputs['ClusterRegion'] || region, + stacks: stack_name, + vpc: outputs['ClusterVpc'] || first_instance['VpcId'], + env: env, + staging_area: outputs['ClusterStagingArea'] || first_tags['cluster-staging-area'] || stack_name, + key: outputs['ClusterKey'] || first_tags['cluster-key'] || first_instance['KeyName'], + cluster_type: 'ec2', + is_admin: false, + site_dbname: outputs['ClusterSiteDbname'] || first_tags['cluster-site-dbname'], + # frontend_host: routing stack for dev, stack output for prod + frontend_host: routing_info[:frontend_host] || outputs['ClusterFrontendHost'] || first_tags['cluster-frontend-host'], + # rtp_channel_hosts: production uses static config, dev uses routing stack + rtp_channel_hosts: is_production ? PROD_RTP_CHANNEL_HOSTS : (routing_info[:rtp_channel_hosts] || parse_rtp_hosts(outputs['ClusterRtpChannelHosts'] || first_tags['cluster-rtp-channel-hosts'])), + nodes: [], + bins: nil, + debug: is_production ? { + bypass_varnish: false, + disable_app_cache: false, + disable_health_monitor: false + } : { + bypass_varnish: true, + disable_app_cache: true, + disable_health_monitor: true + } + } + + # Build nodes array + default_user = outputs['ClusterDefaultUser'] || 'ubuntu' + instances.each do |instance| + cluster_def[:nodes] << build_node_from_instance(instance, default_user) + end + + # Generate site_dbname if not set + cluster_def[:site_dbname] ||= "wootmath_site_#{cluster_def[:env]}_#{stack_name}" + + # Generate frontend_host if not set + cluster_def[:frontend_host] ||= "https://#{stack_name}.wootmath.com" + + $logger.info "Loaded cluster '#{cluster_def[:name]}' with #{cluster_def[:nodes].length} node(s) from CloudFormation" if $logger + + cluster_def + end + + # Parse RTP hosts into the expected hash structure + def parse_rtp_hosts(rtp_input) + return nil if rtp_input.nil? + return nil if rtp_input.is_a?(String) && rtp_input.empty? + + # Already a hash - normalize to symbol keys + if rtp_input.is_a?(Hash) + return { + A: Array(rtp_input[:A] || rtp_input['A']), + B: Array(rtp_input[:B] || rtp_input['B']) + } + end + + # Try to parse as JSON first (handles complex A/B structures) + if rtp_input.start_with?('{') + begin + parsed = JSON.parse(rtp_input) + return { + A: Array(parsed['A']), + B: Array(parsed['B']) + } + rescue JSON::ParserError + # Fall through to simple URL handling + end + end + + # Simple URL string -> same URL for both A and B channels + if rtp_input.start_with?('https://') + return { + A: [rtp_input], + B: [rtp_input] + } + end + + # Unknown format + nil + end + end + end +end diff --git a/lib/zapt/commands/runtask.rb b/lib/zapt/commands/runtask.rb index ec91be7..42f6f80 100644 --- a/lib/zapt/commands/runtask.rb +++ b/lib/zapt/commands/runtask.rb @@ -1,6 +1,7 @@ require 'json' require 'yaml' require_relative '../../zapt' +require_relative '../cluster_def_cf' module Zapt @@ -8,7 +9,7 @@ class CLI < Thor desc "runtask", "run the tasks specified in runlist" method_option :tasks, :aliases => "-t", :type=>:string, :default=>'tasks.rb', :required=>false, :desc => "Task file" method_option :runlist, :aliases => "-r", :type=>:array, :required=>true, :desc => "Run list" - method_option :cluster, :aliases => "-c", :type=>:string, :required=>false, :desc => "Specify cluster on which to run task" + method_option :cluster, :aliases => "-c", :type=>:string, :required=>false, :desc => "Specify cluster on which to run task (YAML file or stack name)" method_option :pem, :aliases => "-p", :type=>:string, :required=>false, :default=>"~/.ssh/dev-test-key.pem", :desc => "Remote command PEM" method_option :capture, :type=>:boolean, :required=>false, :desc => "bypass logging and capture output to stdout" def runtask @@ -26,9 +27,10 @@ def runtask if options[:cluster] cluster = options[:cluster] - $logger.error("Can't find cluster definition #{cluster}") and exit(1) unless File.exist?(cluster) task = Zapt::Tasks.registry[task] - cluster_config = YAML::load(IO.read(cluster)) + + # Load cluster config - try CloudFormation first, then YAML file + cluster_config = load_cluster_config(cluster) # pem from top level cluster config pem = "#{ENV['HOME']}/credentials/#{cluster_config[:key]}.pem" @@ -69,6 +71,60 @@ def parse_args string args ||= {} end + # Load cluster configuration from CloudFormation or YAML file + # Supports: + # - CloudFormation stack name (e.g., "jward", "snapper-box") + # - YAML file path (e.g., "/path/to/cluster.yaml") + # Environment variable CLUSTER_DEF_SOURCE controls behavior: + # - 'cloudformation': Only try CloudFormation + # - 'yaml': Only try YAML file + # - 'auto' (default): Try CloudFormation first, fall back to YAML + def load_cluster_config(cluster) + source = ENV['CLUSTER_DEF_SOURCE'] || 'auto' + + # Extract cluster name from path if it's a YAML file path + if cluster.end_with?('.yaml') + cluster_name = File.basename(cluster, '.yaml') + yaml_path = cluster + else + cluster_name = cluster + yaml_path = nil + end + + # Try CloudFormation first (unless explicitly set to yaml) + if source == 'cloudformation' || source == 'auto' + begin + cluster_config = Zapt::ClusterDefCF.load_from_stack(cluster_name) + return cluster_config + rescue Zapt::Error => e + if source == 'cloudformation' + raise e + end + # Auto mode: fall through to YAML + $logger.info "CloudFormation loading failed (#{e.message}), falling back to YAML" if $logger + end + end + + # Fall back to YAML file + if yaml_path.nil? + # Try to find YAML file in common locations + possible_paths = [ + cluster_name, + "#{cluster_name}.yaml", + File.join(Dir.pwd, 'cluster_defs', "#{cluster_name}.yaml"), + File.join(Dir.pwd, '..', 'common', 'cluster_defs', "#{cluster_name}.yaml") + ] + yaml_path = possible_paths.find { |p| File.exist?(p) } + end + + if yaml_path.nil? || !File.exist?(yaml_path) + raise Zapt::Error.new("Can't find cluster definition: #{cluster_name} (tried CloudFormation and YAML)") + end + + $logger.info "Loading cluster from YAML: #{yaml_path}" if $logger + YAML::load(IO.read(yaml_path)) + end + end end diff --git a/lib/zapt/version.rb b/lib/zapt/version.rb index f279469..0db97de 100644 --- a/lib/zapt/version.rb +++ b/lib/zapt/version.rb @@ -1,6 +1,6 @@ require 'yaml' module Zapt - VERSION = '1.0.1' + VERSION = '1.1.0' NAME = 'zapt' end From 2a3789e250a471c97f61bee3c418d5fa095e6c08 Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Tue, 25 Nov 2025 11:44:59 -0700 Subject: [PATCH 4/8] Add caching, parallel fetching, and SSM support to CF loader - Add 60-second TTL caching for AWS API responses to reduce API calls during multi-task deployments - Parallelize independent AWS API calls (stack outputs, instances, routing stack) using threads for faster loading - Add SSM Parameter Store support for production RTP channel hosts with fallback to static defaults - Add routing stack discovery for dev boxes to get frontend_host - Add clear_cache! method for testing and forced refresh Session: claude -r session --- lib/zapt/cluster_def_cf.rb | 245 ++++++++++++++++++++++++------------- 1 file changed, 162 insertions(+), 83 deletions(-) diff --git a/lib/zapt/cluster_def_cf.rb b/lib/zapt/cluster_def_cf.rb index 74c31e1..fd2098a 100644 --- a/lib/zapt/cluster_def_cf.rb +++ b/lib/zapt/cluster_def_cf.rb @@ -6,13 +6,21 @@ # Set CLUSTER_DEF_SOURCE=cloudformation to use CF-based loading # Or call Zapt::ClusterDefCF.load_from_stack(stack_name) directly # +# Features: +# - Caches AWS API responses for 60 seconds to reduce API calls +# - Parallelizes independent AWS API calls for faster loading +# - Loads production RTP hosts from SSM Parameter Store (with fallback) +# require 'json' module Zapt module ClusterDefCF - # Production RTP channel hosts configuration - # These are static since production comms servers are fixed infrastructure - PROD_RTP_CHANNEL_HOSTS = { + # Cache for AWS API responses to reduce API calls during multi-task deployments + @cache = {} + @cache_ttl = 60 # seconds + + # Default production RTP channel hosts (fallback if SSM param not found) + DEFAULT_PROD_RTP_CHANNEL_HOSTS = { A: [ 'https://comms0.wootmath.com', 'https://comms1.wootmath.com', @@ -26,6 +34,22 @@ module ClusterDefCF }.freeze class << self + # Clear the cache (useful for testing or forced refresh) + def clear_cache! + @cache = {} + end + + # Get cached value or fetch and cache + def cached_fetch(cache_key, &block) + @cache ||= {} + if @cache[cache_key] && (Time.now - @cache[cache_key][:time]) < (@cache_ttl || 60) + return @cache[cache_key][:data] + end + data = block.call + @cache[cache_key] = { data: data, time: Time.now } + data + end + # Check if we should use CloudFormation-based loading def use_cloudformation? ENV['CLUSTER_DEF_SOURCE'] == 'cloudformation' @@ -33,13 +57,15 @@ def use_cloudformation? # Get the current instance's metadata using IMDSv2 def get_instance_metadata(path) - # Get token for IMDSv2 - token = `curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" 2>/dev/null`.strip - return nil if token.empty? - - # Use token to get metadata - result = `curl -s -H "X-aws-ec2-metadata-token: #{token}" "http://169.254.169.254/latest/meta-data/#{path}" 2>/dev/null`.strip - result.empty? ? nil : result + cached_fetch("metadata:#{path}") do + # Get token for IMDSv2 + token = `curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" 2>/dev/null`.strip + next nil if token.empty? + + # Use token to get metadata + result = `curl -s -H "X-aws-ec2-metadata-token: #{token}" "http://169.254.169.254/latest/meta-data/#{path}" 2>/dev/null`.strip + result.empty? ? nil : result + end end # Get the current instance ID from EC2 metadata @@ -56,15 +82,17 @@ def get_current_region # Get instance tags using AWS CLI def get_instance_tags(instance_id, region = nil) region ||= get_current_region - cmd = "aws ec2 describe-tags --filters \"Name=resource-id,Values=#{instance_id}\" --region #{region} --output json 2>/dev/null" - result = `#{cmd}` - return {} if result.empty? - - tags = {} - JSON.parse(result)['Tags'].each do |tag| - tags[tag['Key']] = tag['Value'] + cached_fetch("tags:#{instance_id}:#{region}") do + cmd = "aws ec2 describe-tags --filters \"Name=resource-id,Values=#{instance_id}\" --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + next {} if result.empty? + + tags = {} + JSON.parse(result)['Tags'].each do |tag| + tags[tag['Key']] = tag['Value'] + end + tags end - tags rescue JSON::ParserError {} end @@ -72,18 +100,20 @@ def get_instance_tags(instance_id, region = nil) # Get CloudFormation stack outputs def get_stack_outputs(stack_name, region = nil) region ||= get_current_region || 'us-west-2' - cmd = "aws cloudformation describe-stacks --stack-name #{stack_name} --region #{region} --output json 2>/dev/null" - result = `#{cmd}` - return {} if result.empty? + cached_fetch("outputs:#{stack_name}:#{region}") do + cmd = "aws cloudformation describe-stacks --stack-name #{stack_name} --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + next {} if result.empty? - outputs = {} - stack = JSON.parse(result)['Stacks']&.first - return {} unless stack + outputs = {} + stack = JSON.parse(result)['Stacks']&.first + next {} unless stack - (stack['Outputs'] || []).each do |output| - outputs[output['OutputKey']] = output['OutputValue'] + (stack['Outputs'] || []).each do |output| + outputs[output['OutputKey']] = output['OutputValue'] + end + outputs end - outputs rescue JSON::ParserError {} end @@ -91,22 +121,58 @@ def get_stack_outputs(stack_name, region = nil) # Get CloudFormation stack parameters def get_stack_parameters(stack_name, region = nil) region ||= get_current_region || 'us-west-2' - cmd = "aws cloudformation describe-stacks --stack-name #{stack_name} --region #{region} --output json 2>/dev/null" - result = `#{cmd}` - return {} if result.empty? + cached_fetch("params:#{stack_name}:#{region}") do + cmd = "aws cloudformation describe-stacks --stack-name #{stack_name} --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + next {} if result.empty? - params = {} - stack = JSON.parse(result)['Stacks']&.first - return {} unless stack + params = {} + stack = JSON.parse(result)['Stacks']&.first + next {} unless stack - (stack['Parameters'] || []).each do |param| - params[param['ParameterKey']] = param['ParameterValue'] + (stack['Parameters'] || []).each do |param| + params[param['ParameterKey']] = param['ParameterValue'] + end + params end - params rescue JSON::ParserError {} end + # Get SSM parameter value + def get_ssm_parameter(param_name, region = nil) + region ||= get_current_region || 'us-west-2' + cached_fetch("ssm:#{param_name}:#{region}") do + cmd = "aws ssm get-parameter --name #{param_name} --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + next nil if result.empty? + + JSON.parse(result).dig('Parameter', 'Value') + end + rescue JSON::ParserError + nil + end + + # Get production RTP channel hosts from SSM or use default + def get_prod_rtp_channel_hosts(region = nil) + # Try to load from SSM Parameter Store + ssm_value = get_ssm_parameter('/prod/rtp/channel-hosts', region) + if ssm_value + begin + parsed = JSON.parse(ssm_value) + return { + A: Array(parsed['A']), + B: Array(parsed['B']) + } + rescue JSON::ParserError + # Fall through to default + end + end + + # Return default if SSM not configured + DEFAULT_PROD_RTP_CHANNEL_HOSTS + end + # Try to find associated routing stack for an instance stack # Convention: instance stack "foo-box" -> routing stack "foo-box-routing" or "foo-routing" def find_routing_stack(instance_stack_name, region = nil) @@ -159,17 +225,19 @@ def get_routing_info(routing_stack_name, region = nil) # Get all EC2 instances in a CloudFormation stack def get_stack_instances(stack_name, region = nil) region ||= get_current_region || 'us-west-2' - cmd = "aws ec2 describe-instances --filters \"Name=tag:aws:cloudformation:stack-name,Values=#{stack_name}\" \"Name=instance-state-name,Values=running\" --region #{region} --output json 2>/dev/null" - result = `#{cmd}` - return [] if result.empty? - - instances = [] - JSON.parse(result)['Reservations'].each do |reservation| - reservation['Instances'].each do |instance| - instances << instance + cached_fetch("instances:#{stack_name}:#{region}") do + cmd = "aws ec2 describe-instances --filters \"Name=tag:aws:cloudformation:stack-name,Values=#{stack_name}\" \"Name=instance-state-name,Values=running\" --region #{region} --output json 2>/dev/null" + result = `#{cmd}` + next [] if result.empty? + + instances = [] + JSON.parse(result)['Reservations'].each do |reservation| + reservation['Instances'].each do |instance| + instances << instance + end end + instances end - instances rescue JSON::ParserError [] end @@ -177,32 +245,33 @@ def get_stack_instances(stack_name, region = nil) # Get all EC2 instances in an AutoScaling Group def get_asg_instances(asg_name, region = nil) region ||= get_current_region || 'us-west-2' - - # First get instance IDs from the ASG - asg_cmd = "aws autoscaling describe-auto-scaling-groups --auto-scaling-group-names #{asg_name} --region #{region} --output json 2>/dev/null" - asg_result = `#{asg_cmd}` - return [] if asg_result.empty? - - asg_data = JSON.parse(asg_result) - asg = asg_data['AutoScalingGroups']&.first - return [] unless asg - - instance_ids = asg['Instances']&.select { |i| i['LifecycleState'] == 'InService' }&.map { |i| i['InstanceId'] } - return [] if instance_ids.nil? || instance_ids.empty? - - # Then get full instance details - ids_str = instance_ids.join(' ') - ec2_cmd = "aws ec2 describe-instances --instance-ids #{ids_str} --region #{region} --output json 2>/dev/null" - ec2_result = `#{ec2_cmd}` - return [] if ec2_result.empty? - - instances = [] - JSON.parse(ec2_result)['Reservations'].each do |reservation| - reservation['Instances'].each do |instance| - instances << instance if instance['State']['Name'] == 'running' + cached_fetch("asg:#{asg_name}:#{region}") do + # First get instance IDs from the ASG + asg_cmd = "aws autoscaling describe-auto-scaling-groups --auto-scaling-group-names #{asg_name} --region #{region} --output json 2>/dev/null" + asg_result = `#{asg_cmd}` + next [] if asg_result.empty? + + asg_data = JSON.parse(asg_result) + asg = asg_data['AutoScalingGroups']&.first + next [] unless asg + + instance_ids = asg['Instances']&.select { |i| i['LifecycleState'] == 'InService' }&.map { |i| i['InstanceId'] } + next [] if instance_ids.nil? || instance_ids.empty? + + # Then get full instance details + ids_str = instance_ids.join(' ') + ec2_cmd = "aws ec2 describe-instances --instance-ids #{ids_str} --region #{region} --output json 2>/dev/null" + ec2_result = `#{ec2_cmd}` + next [] if ec2_result.empty? + + instances = [] + JSON.parse(ec2_result)['Reservations'].each do |reservation| + reservation['Instances'].each do |instance| + instances << instance if instance['State']['Name'] == 'running' + end end + instances end - instances rescue JSON::ParserError [] end @@ -229,36 +298,40 @@ def build_node_from_instance(instance, default_user = 'ubuntu') # This is the main function that provides the same interface as load_named_cluster_def # It combines data from the instance stack and the associated routing stack (for dev boxes) # or uses ASG discovery (for production clusters) + # + # Uses parallel fetching for independent API calls to improve performance def load_from_stack(stack_name, region = nil) region ||= get_current_region || 'us-west-2' $logger.info "Loading cluster definition from CloudFormation stack: #{stack_name}" if $logger - # Get stack outputs - outputs = get_stack_outputs(stack_name, region) + # Parallel fetch: stack outputs and instances (independent calls) + outputs = nil + instances = [] + routing_stack = nil + + threads = [] + threads << Thread.new { outputs = get_stack_outputs(stack_name, region) } + threads << Thread.new { instances = get_stack_instances(stack_name, region) } + threads << Thread.new { routing_stack = find_routing_stack(stack_name, region) } + threads.each(&:join) + if outputs.empty? raise Zapt::Error.new("CloudFormation stack '#{stack_name}' not found or has no outputs") end - # Get instances - try ASG first (for production clusters), then CF stack instances (for dev boxes) - instances = [] + # Check for ASG-based instances (parallel fetch already done for CF instances) asg_name = outputs['AutoScalingGroupName'] - if asg_name + if asg_name && instances.empty? $logger.info "Found ASG: #{asg_name}, discovering instances..." if $logger instances = get_asg_instances(asg_name, region) end - # Fall back to CF stack instances if no ASG or no ASG instances found - if instances.empty? - instances = get_stack_instances(stack_name, region) - end - if instances.empty? raise Zapt::Error.new("No running instances found in stack '#{stack_name}'") end - # Try to find associated routing stack for frontend_host and rtp_channel_hosts (dev boxes) - routing_stack = find_routing_stack(stack_name, region) + # Get routing info if routing stack found routing_info = {} if routing_stack routing_info = get_routing_info(routing_stack, region) @@ -274,6 +347,13 @@ def load_from_stack(stack_name, region = nil) env = outputs['ClusterEnv'] || first_tags['env'] || 'development' is_production = (env == 'production') + # Get RTP hosts (from SSM for production, routing stack for dev) + rtp_hosts = if is_production + get_prod_rtp_channel_hosts(region) + else + routing_info[:rtp_channel_hosts] || parse_rtp_hosts(outputs['ClusterRtpChannelHosts'] || first_tags['cluster-rtp-channel-hosts']) + end + # Build cluster definition hash matching YAML structure # Priority: routing stack > stack outputs > instance tags > defaults cluster_def = { @@ -289,8 +369,7 @@ def load_from_stack(stack_name, region = nil) site_dbname: outputs['ClusterSiteDbname'] || first_tags['cluster-site-dbname'], # frontend_host: routing stack for dev, stack output for prod frontend_host: routing_info[:frontend_host] || outputs['ClusterFrontendHost'] || first_tags['cluster-frontend-host'], - # rtp_channel_hosts: production uses static config, dev uses routing stack - rtp_channel_hosts: is_production ? PROD_RTP_CHANNEL_HOSTS : (routing_info[:rtp_channel_hosts] || parse_rtp_hosts(outputs['ClusterRtpChannelHosts'] || first_tags['cluster-rtp-channel-hosts'])), + rtp_channel_hosts: rtp_hosts, nodes: [], bins: nil, debug: is_production ? { From 7b2913a6253ce56bf0398de02f0b100ec8854509 Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Tue, 25 Nov 2025 11:54:08 -0700 Subject: [PATCH 5/8] Pass CLUSTER_DEF_STACK to remote tasks for CF loading When running tasks remotely, pass the stack name via CLUSTER_DEF_STACK environment variable so the remote node can load cluster definition from CloudFormation even if the instance isn't tagged with the CF stack name. Uses 'rvmsudo env VAR=value' to pass the env var through sudo. Session: claude -r session --- lib/zapt/commands/runtask.rb | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/zapt/commands/runtask.rb b/lib/zapt/commands/runtask.rb index 42f6f80..ae38d15 100644 --- a/lib/zapt/commands/runtask.rb +++ b/lib/zapt/commands/runtask.rb @@ -36,17 +36,29 @@ def runtask pem = "#{ENV['HOME']}/credentials/#{cluster_config[:key]}.pem" nodes = cluster_config[:nodes] + + # Pass stack name to remote tasks so they can load cluster def from CloudFormation + # This is needed because remote instances may not have CF stack tags + # Use cluster_config[:name] (from CF), or extract from cluster path/arg + cluster_base = cluster.end_with?('.yaml') ? File.basename(cluster, '.yaml') : cluster + stack_name = cluster_config[:name] || cluster_base + nodes.each_with_index do |node| ip = Zapt.ip_from_node(node) user = node[:user] "Running task: #{task.task_name} on #{ip}" remote_task = ShellTask.new({}) remote_dir = File.dirname(File.join('zcripts', File.expand_path('tasks.rb').split('zcripts/')[1])) + + # Set CLUSTER_DEF_STACK env var so remote load_cluster_def can find the stack + # Use sudo env to pass environment variable through to the subprocess + env_var = "CLUSTER_DEF_STACK=#{stack_name}" + if options[:arglist] args = options[:arglist][i] - remote_task.command(%Q{cd #{remote_dir}; rvmsudo_secure_path=1 rvmsudo zapt runtask -r #{task.task_name} -a \\"#{args}\\"}, host:ip, user:user, pem:pem) + remote_task.command(%Q{cd #{remote_dir}; rvmsudo_secure_path=1 rvmsudo env #{env_var} zapt runtask -r #{task.task_name} -a \\"#{args}\\"}, host:ip, user:user, pem:pem) else - remote_task.command %Q{cd #{remote_dir}; rvmsudo_secure_path=1 rvmsudo zapt runtask #{options[:capture] ? '--capture' : ''} -r #{task.task_name}}, host:ip, user:user, pem:pem + remote_task.command %Q{cd #{remote_dir}; rvmsudo_secure_path=1 rvmsudo env #{env_var} zapt runtask #{options[:capture] ? '--capture' : ''} -r #{task.task_name}}, host:ip, user:user, pem:pem end end else From bd8d670d0898b401fd091db697151eeab92e7016 Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Tue, 25 Nov 2025 12:56:09 -0700 Subject: [PATCH 6/8] local load --- lib/zapt/cluster_def_cf.rb | 41 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/lib/zapt/cluster_def_cf.rb b/lib/zapt/cluster_def_cf.rb index fd2098a..e290c67 100644 --- a/lib/zapt/cluster_def_cf.rb +++ b/lib/zapt/cluster_def_cf.rb @@ -400,6 +400,47 @@ def load_from_stack(stack_name, region = nil) cluster_def end + # Load cluster definition for the current EC2 instance + # Auto-detects the CloudFormation stack from instance tags and sets :this_node + # Returns a cluster_def hash with :this_node populated + def load_for_current_instance + instance_id = get_current_instance_id + raise Zapt::Error.new("Not running on EC2 instance (no instance-id from metadata)") unless instance_id + + region = get_current_region + raise Zapt::Error.new("Could not determine region from instance metadata") unless region + + $logger.info "Auto-detecting cluster for instance #{instance_id} in #{region}" if $logger + + # Get instance tags to find the stack name + tags = get_instance_tags(instance_id, region) + stack_name = tags['aws:cloudformation:stack-name'] + + unless stack_name + raise Zapt::Error.new("Instance #{instance_id} is not part of a CloudFormation stack (missing aws:cloudformation:stack-name tag)") + end + + $logger.info "Found stack name from instance tags: #{stack_name}" if $logger + + # Load the cluster definition from the stack + cluster_def = load_from_stack(stack_name, region) + + # Get this instance's internal IP to set :this_node + internal_ip = get_instance_metadata('local-ipv4') + + # Find this node in the nodes array and set :this_node + my_node = cluster_def[:nodes].find { |n| n[:internal_ip] == internal_ip } + + unless my_node + raise Zapt::Error.new("Could not find node with IP #{internal_ip} in cluster '#{cluster_def[:name]}'") + end + + cluster_def[:this_node] = my_node + $logger.info "Set this_node to instance with IP #{internal_ip}" if $logger + + cluster_def + end + # Parse RTP hosts into the expected hash structure def parse_rtp_hosts(rtp_input) return nil if rtp_input.nil? From 4ee190ccf4770d94289ca8fe04ed1455b3b3d474 Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Wed, 26 Nov 2025 11:06:06 -0700 Subject: [PATCH 7/8] update logger --- lib/zapt/logger.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/zapt/logger.rb b/lib/zapt/logger.rb index 036f992..c321a81 100644 --- a/lib/zapt/logger.rb +++ b/lib/zapt/logger.rb @@ -3,7 +3,7 @@ class Logger attr_accessor :disabled end -$logger = Logger.new(STDOUT) +$logger = Logger.new(STDERR) $logger.level = Logger::INFO $logger.formatter = proc{ |level, datetime, progname, msg| From d4abef3414d12407633891b795a6801b815c3e79 Mon Sep 17 00:00:00 2001 From: Seth Paul Date: Wed, 26 Nov 2025 14:14:27 -0700 Subject: [PATCH 8/8] quiet cluster read logs --- lib/zapt/cluster_def_cf.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/zapt/cluster_def_cf.rb b/lib/zapt/cluster_def_cf.rb index e290c67..b213df9 100644 --- a/lib/zapt/cluster_def_cf.rb +++ b/lib/zapt/cluster_def_cf.rb @@ -303,7 +303,7 @@ def build_node_from_instance(instance, default_user = 'ubuntu') def load_from_stack(stack_name, region = nil) region ||= get_current_region || 'us-west-2' - $logger.info "Loading cluster definition from CloudFormation stack: #{stack_name}" if $logger + $logger.debug "Loading cluster definition from CloudFormation stack: #{stack_name}" if $logger # Parallel fetch: stack outputs and instances (independent calls) outputs = nil @@ -323,7 +323,7 @@ def load_from_stack(stack_name, region = nil) # Check for ASG-based instances (parallel fetch already done for CF instances) asg_name = outputs['AutoScalingGroupName'] if asg_name && instances.empty? - $logger.info "Found ASG: #{asg_name}, discovering instances..." if $logger + $logger.debug "Found ASG: #{asg_name}, discovering instances..." if $logger instances = get_asg_instances(asg_name, region) end @@ -335,7 +335,7 @@ def load_from_stack(stack_name, region = nil) routing_info = {} if routing_stack routing_info = get_routing_info(routing_stack, region) - $logger.info "Found routing stack: #{routing_stack}" if $logger + $logger.debug "Found routing stack: #{routing_stack}" if $logger end # Get first instance for default values @@ -395,7 +395,7 @@ def load_from_stack(stack_name, region = nil) # Generate frontend_host if not set cluster_def[:frontend_host] ||= "https://#{stack_name}.wootmath.com" - $logger.info "Loaded cluster '#{cluster_def[:name]}' with #{cluster_def[:nodes].length} node(s) from CloudFormation" if $logger + $logger.debug "Loaded cluster '#{cluster_def[:name]}' with #{cluster_def[:nodes].length} node(s) from CloudFormation" if $logger cluster_def end @@ -410,7 +410,7 @@ def load_for_current_instance region = get_current_region raise Zapt::Error.new("Could not determine region from instance metadata") unless region - $logger.info "Auto-detecting cluster for instance #{instance_id} in #{region}" if $logger + $logger.debug "Auto-detecting cluster for instance #{instance_id} in #{region}" if $logger # Get instance tags to find the stack name tags = get_instance_tags(instance_id, region) @@ -420,7 +420,7 @@ def load_for_current_instance raise Zapt::Error.new("Instance #{instance_id} is not part of a CloudFormation stack (missing aws:cloudformation:stack-name tag)") end - $logger.info "Found stack name from instance tags: #{stack_name}" if $logger + $logger.debug "Found stack name from instance tags: #{stack_name}" if $logger # Load the cluster definition from the stack cluster_def = load_from_stack(stack_name, region) @@ -436,7 +436,7 @@ def load_for_current_instance end cluster_def[:this_node] = my_node - $logger.info "Set this_node to instance with IP #{internal_ip}" if $logger + $logger.debug "Set this_node to instance with IP #{internal_ip}" if $logger cluster_def end