Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 128 additions & 135 deletions subcommand/health-sync/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/hashicorp/go-multierror"
)

// checkStatus holds the computed status for a Consul health check.
type checkStatus struct {
consulStatus string // Consul health status (api.HealthPassing or api.HealthCritical)
output string // the message to display in Consul
}

// fetchHealthChecks fetches the Consul health checks for both the service
// and proxy registrations
func (c *Command) fetchHealthChecks(consulClient *api.Client, taskMeta awsutil.ECSTaskMeta) (map[string]*api.HealthCheck, error) {
Expand Down Expand Up @@ -62,163 +68,147 @@ func (c *Command) fetchHealthChecks(consulClient *api.Client, taskMeta awsutil.E
return healthCheckMap, nil
}

// setChecksCritical sets checks for all of the containers to critical
func (c *Command) setChecksCritical(consulClient *api.Client, taskMeta awsutil.ECSTaskMeta, clusterARN string, parsedContainerNames []string) error {
// setChecksCritical sets checks for all of the containers to critical.
// Used during graceful shutdown (SIGTERM).
func (c *Command) setChecksCritical(consulClient *api.Client, taskMeta awsutil.ECSTaskMeta, clusterARN string, containerNames []string) error {
var result error

taskID := taskMeta.TaskID()
serviceName := c.constructServiceName(taskMeta.Family)
serviceID := makeServiceID(serviceName, taskMeta.TaskID())

for _, containerName := range parsedContainerNames {
var err error
if containerName == config.ConsulDataplaneContainerName {
err = c.handleHealthForDataplaneContainer(consulClient, taskID, serviceName, clusterARN, containerName, ecs.HealthStatusUnhealthy)
} else {
checkID := constructCheckID(makeServiceID(serviceName, taskID), containerName)
err = c.updateConsulHealthStatus(consulClient, checkID, clusterARN, ecs.HealthStatusUnhealthy)
}
// Create a map with all containers as unhealthy to get all check IDs
containerStatuses := make(map[string]string)
for _, name := range containerNames {
containerStatuses[name] = ecs.HealthStatusUnhealthy
}

if err == nil {
c.log.Info("set Consul health status to critical",
"container", containerName)
} else {
c.log.Warn("failed to set Consul health status to critical",
"err", err,
"container", containerName)
// Use computeCheckStatuses to get all check IDs
checkStatuses := c.computeCheckStatuses(serviceID, containerNames, containerStatuses)

// Update all checks to critical
for checkID, status := range checkStatuses {
err := c.updateConsulHealthStatus(consulClient, checkID, clusterARN, status)
if err != nil {
c.log.Warn("failed to set Consul health status to critical", "err", err, "checkID", checkID)
result = multierror.Append(result, err)
} else {
c.log.Info("set Consul health status to critical", "checkID", checkID)
}
}

return result
}

// syncChecks fetches metadata for the ECS task and uses that metadata to
// updates the Consul TTL checks for the containers specified in
// `parsedContainerNames`. Checks are only updated if they have changed since
// the last invocation of this function.
func (c *Command) syncChecks(consulClient *api.Client,
currentStatuses map[string]string,
clusterARN string,
parsedContainerNames []string) map[string]string {
// Fetch task metadata to get latest health of the containers
taskMeta, err := awsutil.ECSTaskMetadata()
if err != nil {
c.log.Error("unable to get task metadata", "err", err)
return currentStatuses
}

serviceName := c.constructServiceName(taskMeta.Family)
containersToSync, missingContainers := findContainersToSync(parsedContainerNames, taskMeta)

// Mark the Consul health status as critical for missing containers
for _, name := range missingContainers {
checkID := constructCheckID(makeServiceID(serviceName, taskMeta.TaskID()), name)
c.log.Debug("marking container as unhealthy since it wasn't found in the task metadata", "name", name)
// computeOverallDataplaneHealth computes the aggregate health status.
// Returns UNHEALTHY if any container is unhealthy.
func computeOverallDataplaneHealth(containerStatuses map[string]string) string {
if len(containerStatuses) == 0 {
// This should not be possible in practice since containerNames always
// includes at least the dataplane container. Treat as unhealthy to be safe.
return ecs.HealthStatusUnhealthy
}

var err error
if name == config.ConsulDataplaneContainerName {
err = c.handleHealthForDataplaneContainer(consulClient, taskMeta.TaskID(), serviceName, clusterARN, name, ecs.HealthStatusUnhealthy)
} else {
err = c.updateConsulHealthStatus(consulClient, checkID, clusterARN, ecs.HealthStatusUnhealthy)
}

if err != nil {
c.log.Error("failed to update Consul health status for missing container", "err", err, "container", name)
} else {
c.log.Info("container health check updated in Consul for missing container", "container", name)
currentStatuses[name] = api.HealthCritical
for _, status := range containerStatuses {
if status != ecs.HealthStatusHealthy {
return ecs.HealthStatusUnhealthy
}
}
return ecs.HealthStatusHealthy
}

parsedContainers := make(map[string]string)
// iterate over parse
for _, container := range containersToSync {
c.log.Debug("updating Consul check from ECS container health",
"name", container.Name,
"status", container.Health.Status,
"statusSince", container.Health.StatusSince,
"exitCode", container.Health.ExitCode,
)
parsedContainers[container.Name] = container.Health.Status
previousStatus := currentStatuses[container.Name]
if container.Health.Status != previousStatus {
var err error
if container.Name != config.ConsulDataplaneContainerName {
checkID := constructCheckID(makeServiceID(serviceName, taskMeta.TaskID()), container.Name)
err = c.updateConsulHealthStatus(consulClient, checkID, clusterARN, container.Health.Status)
}
// computeCheckStatuses computes the desired Consul health status for each check.
// Returns a map of checkID -> checkStatus containing both Consul status and output message.
func (c *Command) computeCheckStatuses(serviceID string, containerNames []string, containerStatuses map[string]string) map[string]checkStatus {
checkStatuses := make(map[string]checkStatus)

// Overall dataplane health is the aggregate of all container statuses
overallECSHealth := computeOverallDataplaneHealth(containerStatuses)
overallConsulHealth := ecsHealthToConsulHealth(overallECSHealth)

if err != nil {
c.log.Warn("failed to update Consul health status", "err", err)
} else {
c.log.Info("container health check updated in Consul",
"name", container.Name,
"status", container.Health.Status,
"statusSince", container.Health.StatusSince,
"exitCode", container.Health.ExitCode,
)
currentStatuses[container.Name] = container.Health.Status
for _, name := range containerNames {
if name == config.ConsulDataplaneContainerName {
// Dataplane container maps to overall health on service check
serviceCheckID := constructCheckID(serviceID, name)
checkStatuses[serviceCheckID] = checkStatus{
consulStatus: overallConsulHealth,
output: fmt.Sprintf("ECS health status is %q for container %q", overallECSHealth, serviceCheckID),
}
}

}
overallDataplaneHealthStatus, ok := parsedContainers[config.ConsulDataplaneContainerName]
// if dataplane container exist and healthy then proceed to checking the other containers health
if ok && overallDataplaneHealthStatus == ecs.HealthStatusHealthy {
//
for _, healthStatus := range parsedContainers {
// as soon as we find any unhealthy container, we can set the dataplane health to unhealthy
if healthStatus != ecs.HealthStatusHealthy {
overallDataplaneHealthStatus = ecs.HealthStatusUnhealthy
break
// Non-gateways also have a proxy check
if !c.config.IsGateway() {
proxySvcID, _ := makeProxySvcIDAndName(serviceID, "")
proxyCheckID := constructCheckID(proxySvcID, name)
checkStatuses[proxyCheckID] = checkStatus{
consulStatus: overallConsulHealth,
output: fmt.Sprintf("ECS health status is %q for container %q", overallECSHealth, proxyCheckID),
}
}
} else {
// Non-dataplane containers map directly to their individual check
checkID := constructCheckID(serviceID, name)
ecsHealth := containerStatuses[name]
checkStatuses[checkID] = checkStatus{
consulStatus: ecsHealthToConsulHealth(ecsHealth),
output: fmt.Sprintf("ECS health status is %q for container %q", ecsHealth, checkID),
}
}
} else {
// If no dataplane container or dataplane container is not healthy set overall health to unhealthy
overallDataplaneHealthStatus = ecs.HealthStatusUnhealthy
}

err = c.handleHealthForDataplaneContainer(consulClient, taskMeta.TaskID(), serviceName, clusterARN, config.ConsulDataplaneContainerName, overallDataplaneHealthStatus)
return checkStatuses
}

// syncChecks fetches ECS task metadata and updates Consul health checks
// for the specified containers. Checks are only updated when their status
// has changed since the last invocation.
func (c *Command) syncChecks(consulClient *api.Client,
previousStatuses map[string]checkStatus,
clusterARN string,
containerNames []string) map[string]checkStatus {

// Phase 1: Gather current container state
taskMeta, err := awsutil.ECSTaskMetadata()
if err != nil {
c.log.Warn("failed to update Consul health status", "err", err)
c.log.Error("unable to get task metadata", "err", err)
return previousStatuses
}

return currentStatuses
}
serviceName := c.constructServiceName(taskMeta.Family)
serviceID := makeServiceID(serviceName, taskMeta.TaskID())
containerStatuses := getContainerHealthStatuses(containerNames, taskMeta)

// handleHealthForDataplaneContainer takes care of the special handling needed for syncing
// the health of consul-dataplane container. We register two checks (one for the service
// and the other for proxy) when registering a typical service to the catalog. Updates
// should also happen twice in such cases.
func (c *Command) handleHealthForDataplaneContainer(consulClient *api.Client, taskID, serviceName, clusterARN, containerName, ecsHealthStatus string) error {
var checkID string
serviceID := makeServiceID(serviceName, taskID)
if c.config.IsGateway() {
checkID = constructCheckID(serviceID, containerName)
return c.updateConsulHealthStatus(consulClient, checkID, clusterARN, ecsHealthStatus)
}
// Phase 2: Turn ecs container health into consul checks
currentStatuses := c.computeCheckStatuses(serviceID, containerNames, containerStatuses)

checkID = constructCheckID(serviceID, containerName)
err := c.updateConsulHealthStatus(consulClient, checkID, clusterARN, ecsHealthStatus)
if err != nil {
return err
// Phase 3: Update Consul for any checks that have changed
for checkID, status := range currentStatuses {
previousStatus := previousStatuses[checkID]
if status == previousStatus {
continue
}

err := c.updateConsulHealthStatus(consulClient, checkID, clusterARN, status)
if err != nil {
c.log.Warn("failed to update Consul health status", "err", err, "checkID", checkID)
// Keep the previous status on error so we retry next cycle
currentStatuses[checkID] = previousStatus
} else {
c.log.Info("health check updated in Consul", "checkID", checkID, "status", status.consulStatus)
}
}

proxySvcID, _ := makeProxySvcIDAndName(serviceID, "")
checkID = constructCheckID(proxySvcID, containerName)
return c.updateConsulHealthStatus(consulClient, checkID, clusterARN, ecsHealthStatus)
// Phase 4: Return current status
return currentStatuses
}

func (c *Command) updateConsulHealthStatus(consulClient *api.Client, checkID string, clusterARN string, ecsHealthStatus string) error {
consulHealthStatus := ecsHealthToConsulHealth(ecsHealthStatus)

func (c *Command) updateConsulHealthStatus(consulClient *api.Client, checkID string, clusterARN string, status checkStatus) error {
check, ok := c.checks[checkID]
if !ok {
return fmt.Errorf("unable to find check with ID %s", checkID)
}

check.Status = consulHealthStatus
check.Output = fmt.Sprintf("ECS health status is %q for container %q", ecsHealthStatus, checkID)
check.Status = status.consulStatus
check.Output = status.output
c.checks[checkID] = check

updateCheckReq := &api.CatalogRegistration{
Expand All @@ -245,24 +235,27 @@ func constructCheckID(serviceID, containerName string) string {
return fmt.Sprintf("%s-%s", serviceID, containerName)
}

func findContainersToSync(containerNames []string, taskMeta awsutil.ECSTaskMeta) ([]awsutil.ECSTaskMetaContainer, []string) {
var ecsContainers []awsutil.ECSTaskMetaContainer
var missing []string

for _, container := range containerNames {
found := false
for _, ecsContainer := range taskMeta.Containers {
if ecsContainer.Name == container {
ecsContainers = append(ecsContainers, ecsContainer)
found = true
break
}
}
if !found {
missing = append(missing, container)
// getContainerHealthStatuses builds a map of container name to ECS health status.
// Missing containers are assigned ecs.HealthStatusUnhealthy.
func getContainerHealthStatuses(containerNames []string, taskMeta awsutil.ECSTaskMeta) map[string]string {
statuses := make(map[string]string)

// Build a lookup map from task metadata
taskContainers := make(map[string]string)
for _, container := range taskMeta.Containers {
taskContainers[container.Name] = container.Health.Status
}

// Map each requested container to its status
for _, name := range containerNames {
if status, found := taskContainers[name]; found {
statuses[name] = status
} else {
statuses[name] = ecs.HealthStatusUnhealthy
}
}
return ecsContainers, missing

return statuses
}

func ecsHealthToConsulHealth(ecsHealth string) string {
Expand Down
Loading