Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/common/ingest/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewMetrics(prefix string) *Metrics {
pulsarMessageProcessingDelay: promauto.NewGaugeVec(pulsarMessageProcessingDelayOpts, []string{"subscription", "partition"}),
pulsarMessagePublishTime: promauto.NewGaugeVec(pulsarMessagePublishTime, []string{"subscription", "partition"}),
pulsarMessagesProcessed: promauto.NewCounter(pulsarMessagesProcessedOpts),
eventsProcessed: promauto.NewCounterVec(eventsProcessedOpts, []string{"queue", "eventType", "msgType"}),
eventsProcessed: promauto.NewCounterVec(eventsProcessedOpts, []string{"queue", "eventType", "event_type", "msgType", "msg_type"}),
}
}

Expand Down Expand Up @@ -110,9 +110,9 @@ func (m *Metrics) RecordPulsarProcessingDelay(subscriptionName string, partition
}

func (m *Metrics) RecordEventSequenceProcessed(queue string, msgType string) {
m.eventsProcessed.With(map[string]string{"queue": queue, "eventType": JobSetEventsLabel, "msgType": msgType}).Inc()
m.eventsProcessed.With(map[string]string{"queue": queue, "eventType": JobSetEventsLabel, "event_type": JobSetEventsLabel, "msgType": msgType, "msg_type": msgType}).Inc()
}

func (m *Metrics) RecordControlPlaneEventProcessed(msgType string) {
m.eventsProcessed.With(map[string]string{"queue": "N/A", "eventType": ControlPlaneEventsLabel, "msgType": msgType}).Inc()
m.eventsProcessed.With(map[string]string{"queue": "N/A", "eventType": ControlPlaneEventsLabel, "event_type": ControlPlaneEventsLabel, "msgType": msgType, "msg_type": msgType}).Inc()
}
120 changes: 60 additions & 60 deletions internal/common/metrics/scheduler_metrics.go

Large diffs are not rendered by default.

38 changes: 20 additions & 18 deletions internal/executor/metrics/pod_metrics/cluster_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
)

const (
leasedPhase = "Leased"
queueLabel = "queue"
phaseLabel = "phase"
resourceTypeLabel = "resourceType"
nodeTypeLabel = "nodeType"
leasedPhase = "Leased"
queueLabel = "queue"
phaseLabel = "phase"
resourceTypeLabel = "resourceType"
resourceTypeLabelSnake = "resource"
nodeTypeLabel = "nodeType"
nodeTypeLabelSnake = "node_type"
)

const (
Expand All @@ -32,37 +34,37 @@ const (
var podCountDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"job_pod",
"Pods in different phases by queue",
[]string{queueLabel, phaseLabel, nodeTypeLabel}, nil,
[]string{queueLabel, phaseLabel, nodeTypeLabel, nodeTypeLabelSnake}, nil,
)

var podResourceRequestDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"job_pod_resource_request",
"Pod resource requests in different phases by queue",
[]string{queueLabel, phaseLabel, resourceTypeLabel, nodeTypeLabel}, nil,
[]string{queueLabel, phaseLabel, resourceTypeLabel, resourceTypeLabelSnake, nodeTypeLabel, nodeTypeLabelSnake}, nil,
)

var podResourceUsageDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"job_pod_resource_usage",
"Pod resource usage in different phases by queue",
[]string{queueLabel, phaseLabel, resourceTypeLabel, nodeTypeLabel}, nil,
[]string{queueLabel, phaseLabel, resourceTypeLabel, resourceTypeLabelSnake, nodeTypeLabel, nodeTypeLabelSnake}, nil,
)

var nodeCountDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"available_node_count",
"Number of nodes available for Armada jobs",
[]string{nodeTypeLabel}, nil,
[]string{nodeTypeLabel, nodeTypeLabelSnake}, nil,
)

var nodeAvailableResourceDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"available_node_resource_allocatable",
"Resource allocatable on nodes available for Armada jobs",
[]string{resourceTypeLabel, nodeTypeLabel}, nil,
[]string{resourceTypeLabel, resourceTypeLabelSnake, nodeTypeLabel, nodeTypeLabelSnake}, nil,
)

var nodeTotalResourceDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"available_node_resource_total",
"Total resource on nodes available for Armada jobs",
[]string{resourceTypeLabel, nodeTypeLabel}, nil,
[]string{resourceTypeLabel, resourceTypeLabelSnake, nodeTypeLabel, nodeTypeLabelSnake}, nil,
)

type ClusterContextMetrics struct {
Expand Down Expand Up @@ -208,27 +210,27 @@ func (m *ClusterContextMetrics) Collect(metrics chan<- prometheus.Metric) {
for phase, phaseMetric := range phaseMetrics {
for resourceType, request := range phaseMetric.resourceRequest {
metrics <- prometheus.MustNewConstMetric(podResourceRequestDesc, prometheus.GaugeValue,
request.AsApproximateFloat64(), queue, phase, resourceType, nodeType)
request.AsApproximateFloat64(), queue, phase, resourceType, resourceType, nodeType, nodeType)
}
for resourceType, usage := range phaseMetric.resourceUsage {
metrics <- prometheus.MustNewConstMetric(podResourceUsageDesc, prometheus.GaugeValue,
usage.AsApproximateFloat64(), queue, phase, resourceType, nodeType)
usage.AsApproximateFloat64(), queue, phase, resourceType, resourceType, nodeType, nodeType)
}
metrics <- prometheus.MustNewConstMetric(podCountDesc, prometheus.GaugeValue, phaseMetric.count, queue, phase, nodeType)
metrics <- prometheus.MustNewConstMetric(podCountDesc, prometheus.GaugeValue, phaseMetric.count, queue, phase, nodeType, nodeType)
}
}
}

for _, nodeGroup := range nodeGroupAllocationInfos {
metrics <- prometheus.MustNewConstMetric(nodeCountDesc, prometheus.GaugeValue, float64(len(nodeGroup.Nodes)), nodeGroup.NodeType)
metrics <- prometheus.MustNewConstMetric(nodeCountDesc, prometheus.GaugeValue, float64(len(nodeGroup.Nodes)), nodeGroup.NodeType, nodeGroup.NodeType)
for resourceType, allocatable := range nodeGroup.NodeGroupAllocatableCapacity {
metrics <- prometheus.MustNewConstMetric(nodeAvailableResourceDesc,
prometheus.GaugeValue, allocatable.AsApproximateFloat64(), resourceType,
nodeGroup.NodeType)
prometheus.GaugeValue, allocatable.AsApproximateFloat64(), resourceType, resourceType,
nodeGroup.NodeType, nodeGroup.NodeType)
}

for resourceType, total := range nodeGroup.NodeGroupCapacity {
metrics <- prometheus.MustNewConstMetric(nodeTotalResourceDesc, prometheus.GaugeValue, total.AsApproximateFloat64(), resourceType, nodeGroup.NodeType)
metrics <- prometheus.MustNewConstMetric(nodeTotalResourceDesc, prometheus.GaugeValue, total.AsApproximateFloat64(), resourceType, resourceType, nodeGroup.NodeType, nodeGroup.NodeType)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/scheduler/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ const (
priorityClassLabel = "priority_class"
nodeLabel = "node"
nodeTypeLabel = "nodeType"
nodeTypeLabelSnake = "node_type"
clusterLabel = "cluster"
errorCategoryLabel = "category"
errorSubcategoryLabel = "subcategory"
stateLabel = "state"
priorStateLabel = "priorState"
priorStateLabelSnake = "prior_state"
resourceLabel = "resource"
reservationLabel = "reservation"
schedulableLabel = "schedulable"
overAllocatedLabel = "overAllocated"
overAllocatedLabelSnake = "over_allocated"
physicalPoolLabel = "physical_pool"
capacityClassLabel = "capacity_class"
jobShapeLabel = "job_shape"
Expand Down
14 changes: 8 additions & 6 deletions internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
poolAndShapeAndReasonLabels = []string{poolLabel, jobShapeLabel, unschedulableReasonLabel}
poolQueueAndResourceLabels = []string{poolLabel, queueLabel, resourceLabel}
poolAndOutcomeLabels = []string{poolLabel, outcomeLabel, terminationReasonLabel}
nodeLabels = []string{poolLabel, nodeLabel, clusterLabel, nodeTypeLabel, resourceLabel, reservationLabel, schedulableLabel, overAllocatedLabel, physicalPoolLabel, capacityClassLabel}
nodeLabels = []string{poolLabel, nodeLabel, clusterLabel, nodeTypeLabel, nodeTypeLabelSnake, resourceLabel, reservationLabel, schedulableLabel, overAllocatedLabel, overAllocatedLabelSnake, physicalPoolLabel, capacityClassLabel}
defaultType = "unknown"
reconcilerFailureType = "reconciler"
)
Expand Down Expand Up @@ -253,7 +253,7 @@ func newPerCycleMetrics() *perCycleMetrics {
Name: prefix + "node_preemptibility",
Help: "is it possible to clear this node by preempting any jobs on it?",
},
[]string{poolLabel, nodeLabel, clusterLabel, nodeTypeLabel, "isPreemptible", "reason"},
[]string{poolLabel, nodeLabel, clusterLabel, nodeTypeLabel, nodeTypeLabelSnake, "isPreemptible", "is_preemptible", "reason"},
)

protectedFractionOfFairShare := prometheus.NewGaugeVec(
Expand Down Expand Up @@ -597,6 +597,8 @@ func (m *cycleMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result
nodePreemptiblityStats.NodeName,
nodePreemptiblityStats.Cluster,
nodePreemptiblityStats.NodeType,
nodePreemptiblityStats.NodeType,
fmt.Sprintf("%t", nodePreemptiblityStats.Preemptible),
fmt.Sprintf("%t", nodePreemptiblityStats.Preemptible),
nodePreemptiblityStats.Reason).Set(1.0)
}
Expand All @@ -615,15 +617,15 @@ func (m *cycleMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result
nodeCapacityClass = CapacityClassShared
}
for _, resource := range node.GetAllocatableResources().GetAll() {
currentCycle.nodeAllocatableResource.WithLabelValues(pool, node.GetName(), node.GetExecutor(), node.GetReportingNodeType(), resource.Name, node.GetReservation(),
isSchedulable, isOverallocated, node.GetPool(), nodeCapacityClass).Set(resource.Value.AsApproximateFloat64())
currentCycle.nodeAllocatableResource.WithLabelValues(pool, node.GetName(), node.GetExecutor(), node.GetReportingNodeType(), node.GetReportingNodeType(), resource.Name, node.GetReservation(),
isSchedulable, isOverallocated, isOverallocated, node.GetPool(), nodeCapacityClass).Set(resource.Value.AsApproximateFloat64())
}

allocated := node.GetAllocatableResources().Subtract(node.AllocatableByPriority[internaltypes.EvictedPriority])
for _, resource := range allocated.GetAll() {
allocatableValue := math.Max(resource.Value.AsApproximateFloat64(), 0)
currentCycle.nodeAllocatedResource.WithLabelValues(pool, node.GetName(), node.GetExecutor(), node.GetReportingNodeType(), resource.Name, node.GetReservation(),
isSchedulable, isOverallocated, node.GetPool(), nodeCapacityClass).Set(allocatableValue)
currentCycle.nodeAllocatedResource.WithLabelValues(pool, node.GetName(), node.GetExecutor(), node.GetReportingNodeType(), node.GetReportingNodeType(), resource.Name, node.GetReservation(),
isSchedulable, isOverallocated, isOverallocated, node.GetPool(), nodeCapacityClass).Set(allocatableValue)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/metrics/cycle_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestResetLeaderMetrics_ResetsLatestCycleMetrics(t *testing.T) {
poolLabelValues := []string{"pool1"}
poolQueueLabelValues := []string{"pool1", "queue1"}
poolQueueResourceLabelValues := []string{"pool1", "queue1", "cpu"}
nodeResourceLabelValues := []string{"pool1", "node1", "cluster1", "type1", "cpu", "", "true", "false", "pool1", CapacityClassDedicated}
nodeResourceLabelValues := []string{"pool1", "node1", "cluster1", "type1", "type1", "cpu", "", "true", "false", "false", "pool1", CapacityClassDedicated}

testResetGauge := func(getVec func(metrics *cycleMetrics) *prometheus.GaugeVec, labelValues []string) {
vec := getVec(m)
Expand Down Expand Up @@ -227,8 +227,8 @@ func TestDisableLeaderMetrics(t *testing.T) {
m.latestCycleMetrics.Load().loopNumber.WithLabelValues("pool1").Inc()
m.latestCycleMetrics.Load().evictedJobs.WithLabelValues("pool1", "queue1").Inc()
m.latestCycleMetrics.Load().evictedResources.WithLabelValues("pool1", "queue1", "cpu").Inc()
m.latestCycleMetrics.Load().nodeAllocatableResource.WithLabelValues("pool1", "node1", "cluster1", "type1", "cpu", "", "true", "false", "pool1", CapacityClassDedicated).Inc()
m.latestCycleMetrics.Load().nodeAllocatedResource.WithLabelValues("pool1", "node1", "cluster1", "type1", "cpu", "", "true", "false", "pool1", CapacityClassDedicated).Inc()
m.latestCycleMetrics.Load().nodeAllocatableResource.WithLabelValues("pool1", "node1", "cluster1", "type1", "type1", "cpu", "", "true", "false", "false", "pool1", CapacityClassDedicated).Inc()
m.latestCycleMetrics.Load().nodeAllocatedResource.WithLabelValues("pool1", "node1", "cluster1", "type1", "type1", "cpu", "", "true", "false", "false", "pool1", CapacityClassDedicated).Inc()
m.latestCycleMetrics.Load().nodePoolSize.WithLabelValues("pool1").Inc()

ch := make(chan prometheus.Metric, 1000)
Expand Down
24 changes: 12 additions & 12 deletions internal/scheduler/metrics/state_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,42 +53,42 @@ func newJobStateMetrics(
Name: prefix + "job_state_counter_by_queue",
Help: "Job states at queue level",
},
[]string{queueLabel, poolLabel, stateLabel, priorStateLabel},
[]string{queueLabel, poolLabel, stateLabel, priorStateLabel, priorStateLabelSnake},
)
jobStateCounterByNode := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "job_state_counter_by_node",
Help: "Job states at node level",
},
[]string{nodeLabel, poolLabel, clusterLabel, stateLabel, priorStateLabel},
[]string{nodeLabel, poolLabel, clusterLabel, stateLabel, priorStateLabel, priorStateLabelSnake},
)
jobStateSecondsByQueue := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "job_state_seconds_by_queue",
Help: "time spent in different states at the queue level",
},
[]string{queueLabel, poolLabel, stateLabel, priorStateLabel},
[]string{queueLabel, poolLabel, stateLabel, priorStateLabel, priorStateLabelSnake},
)
jobStateSecondsByNode := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "job_state_seconds_by_node",
Help: "time spent in different states at the node level",
},
[]string{nodeLabel, poolLabel, clusterLabel, stateLabel, priorStateLabel},
[]string{nodeLabel, poolLabel, clusterLabel, stateLabel, priorStateLabel, priorStateLabelSnake},
)
jobStateResourceSecondsByQueue := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "job_state_resource_seconds_by_queue",
Help: "Resource-seconds spent in different states at the queue level",
},
[]string{queueLabel, poolLabel, stateLabel, priorStateLabel, resourceLabel},
[]string{queueLabel, poolLabel, stateLabel, priorStateLabel, priorStateLabelSnake, resourceLabel},
)
jobStateResourceSecondsByNode := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "job_state_resource_seconds_by_node",
Help: "Resource-seconds spent in different states at the node level",
},
[]string{nodeLabel, poolLabel, clusterLabel, stateLabel, priorStateLabel, resourceLabel},
[]string{nodeLabel, poolLabel, clusterLabel, stateLabel, priorStateLabel, priorStateLabelSnake, resourceLabel},
)
jobErrorsByQueue := prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -270,27 +270,27 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio

// Counters
m.jobStateCounterByQueue.
WithLabelValues(queue, pool, state, priorState).Inc()
WithLabelValues(queue, pool, state, priorState, priorState).Inc()

m.jobStateCounterByNode.
WithLabelValues(node, pool, cluster, state, priorState).Inc()
WithLabelValues(node, pool, cluster, state, priorState, priorState).Inc()

// State seconds
m.jobStateSecondsByQueue.
WithLabelValues(queue, pool, state, priorState).Add(duration)
WithLabelValues(queue, pool, state, priorState, priorState).Add(duration)

m.jobStateSecondsByNode.
WithLabelValues(node, pool, cluster, state, priorState).Add(duration)
WithLabelValues(node, pool, cluster, state, priorState, priorState).Add(duration)

// Resource Seconds
for _, res := range m.trackedResourceNames {
resQty := requests.GetByNameZeroIfMissing(string(res))
resSeconds := duration * float64(resQty.MilliValue()) / 1000
resSeconds = math.Max(resSeconds, 0)
m.jobStateResourceSecondsByQueue.
WithLabelValues(queue, pool, state, priorState, res.String()).Add(resSeconds)
WithLabelValues(queue, pool, state, priorState, priorState, res.String()).Add(resSeconds)
m.jobStateResourceSecondsByNode.
WithLabelValues(node, pool, cluster, state, priorState, res.String()).Add(resSeconds)
WithLabelValues(node, pool, cluster, state, priorState, priorState, res.String()).Add(resSeconds)
}
}

Expand Down
Loading