diff --git a/code/go-utils/config/config.go b/code/go-utils/config/config.go index f96908a5..01028b6a 100644 --- a/code/go-utils/config/config.go +++ b/code/go-utils/config/config.go @@ -144,11 +144,6 @@ func NewClusterConfig( } } -// Finds the node configuration closest in time to the time stamp. Returns nil if not found. -func (cc *ClusterConfig) LookupHostByTime(hostname string, timestamp string) *NodeConfigRecord { - panic("No support for the time dimension of config yet") -} - // Returns the most recent node configuration for the node. Returns nil if not found. func (cc *ClusterConfig) LookupHost(hostname string) *NodeConfigRecord { if probe, found := cc.nodes[hostname]; found { diff --git a/code/sonalyze/cmd/config.go b/code/sonalyze/cmd/config.go index 2477e627..5b681951 100644 --- a/code/sonalyze/cmd/config.go +++ b/code/sonalyze/cmd/config.go @@ -14,7 +14,7 @@ import ( // // Over time this may become more complicated, as the config becomes time-dependent. func EnsureConfigForInputStreams( - cfg *config.ConfigDataProvider, + cdp *config.ConfigDataProvider, streams sample.InputStreamSet, reason string, ) (sample.InputStreamSet, error) { @@ -23,7 +23,7 @@ func EnsureConfigForInputStreams( for key, stream := range streams { hn := (*stream)[0].Hostname ts := (*stream)[0].Timestamp - if cfg.LookupHostByTime(hn, ts) == nil { + if cdp.LookupSingleHostByTime(hn, ts) == nil { bad[key] = true Log.Infof("Warning: Missing host configuration for %s", hn.String()) } diff --git a/code/sonalyze/cmd/jobs/perform.go b/code/sonalyze/cmd/jobs/perform.go index 7717285d..33920834 100644 --- a/code/sonalyze/cmd/jobs/perform.go +++ b/code/sonalyze/cmd/jobs/perform.go @@ -156,17 +156,17 @@ func (jc *JobsCommand) Perform( Log.Infof("Samples retained after filtering: %d", numSamples) } - cfg := config.MaybeOpenConfigDataProvider(meta) + cdp := config.MaybeOpenConfigDataProvider(meta) if NeedsConfig(jobsFormatters, jc.PrintFields) { var err error - streams, err = EnsureConfigForInputStreams(cfg, streams, "relative format arguments") + streams, err = EnsureConfigForInputStreams(cdp, streams, "relative format arguments") if err != nil { return err } } - summaries := jc.summarizeAndFilterJobs(meta, cfg, streams, bounds) + summaries := jc.summarizeAndFilterJobs(meta, cdp, streams, bounds) if Verbose { Log.Infof("Jobs after aggregation filtering: %d", len(summaries)) } @@ -187,7 +187,7 @@ func (jc *JobsCommand) Perform( func (jc *JobsCommand) summarizeAndFilterJobs( meta types.Context, - cfg *config.ConfigDataProvider, + cdp *config.ConfigDataProvider, streams sample.InputStreamSet, bounds Timebounds, ) []*jobSummary { @@ -210,7 +210,7 @@ func (jc *JobsCommand) summarizeAndFilterJobs( needZombie: jc.Zombie, } summaries, discarded, fb := - jc.summarizeJobsFromSonarData(cfg, bounds, jobs, summaryFilter, fb) + jc.summarizeJobsFromSonarData(cdp, bounds, jobs, summaryFilter, fb) if Verbose { Log.Infof("Jobs discarded by aggregation filtering: %d", discarded) } @@ -228,7 +228,7 @@ func (jc *JobsCommand) summarizeAndFilterJobs( } func (jc *JobsCommand) summarizeJobsFromSonarData( - cfg *config.ConfigDataProvider, + cdp *config.ConfigDataProvider, bounds Timebounds, jobs sample.MergedJobs, summaryFilter *aggregationFilter, @@ -256,7 +256,7 @@ func (jc *JobsCommand) summarizeJobsFromSonarData( discarded := 0 for _, job := range jobs { if uint(len(job.Samples)) >= minSamples { - summary := summarizeSingleJobFromSonarData(cfg, bounds, job, now, fb) + summary := summarizeSingleJobFromSonarData(cdp, bounds, job, now, fb) if summaryFilter == nil || summaryFilter.apply(summary) { summaries = append(summaries, summary) } else { @@ -271,7 +271,7 @@ func (jc *JobsCommand) summarizeJobsFromSonarData( // Aggregate and summarize but do not attach any sacct data. func summarizeSingleJobFromSonarData( - cfg *config.ConfigDataProvider, + cdp *config.ConfigDataProvider, bounds Timebounds, job sample.MergedJob, now int64, @@ -284,7 +284,7 @@ func summarizeSingleJobFromSonarData( first := samples[0].Timestamp last := samples[len(samples)-1].Timestamp duration := last - first - aggregate := aggregateSingleJobFromSonarData(cfg, host, samples, fb) + aggregate := aggregateSingleJobFromSonarData(cdp, host, samples, fb) aggregate.u64[uDurationSec] = uint64(duration) usesGpu := !aggregate.Gpus.IsEmpty() flags := 0 @@ -357,7 +357,7 @@ func summarizeSingleJobFromSonarData( // duplicated timestamps, return a JobAggregate for the job, with values that are computed from all // log entries. func aggregateSingleJobFromSonarData( - cfg *config.ConfigDataProvider, + cdp *config.ConfigDataProvider, host Ustr, job []sample.Sample, fb flagBag, @@ -417,7 +417,7 @@ func aggregateSingleJobFromSonarData( } usesGpu := !gpus.IsEmpty() - if sys := cfg.LookupHostByTime(host, job[0].Timestamp); sys != nil { + if sys := cdp.LookupMergedHostByTime(host, job[0].Timestamp); sys != nil { // Quantities can be zero in surprising ways, so always guard divisions if cores := float64(sys.CpuCores); cores > 0 { rCpuPctAvg = cpuPctAvg / cores diff --git a/code/sonalyze/cmd/load/perform.go b/code/sonalyze/cmd/load/perform.go index 04dada80..bda3f4d2 100644 --- a/code/sonalyze/cmd/load/perform.go +++ b/code/sonalyze/cmd/load/perform.go @@ -53,11 +53,11 @@ func (lc *LoadCommand) Perform( } fromIncl, toIncl := lc.InterpretFromToWithBounds(bounds) - cfg := config.MaybeOpenConfigDataProvider(meta) + cdp := config.MaybeOpenConfigDataProvider(meta) if NeedsConfig(loadFormatters, lc.PrintFields) { var err error - streams, err = EnsureConfigForInputStreams(cfg, streams, "relative format arguments") + streams, err = EnsureConfigForInputStreams(cdp, streams, "relative format arguments") if err != nil { return err } @@ -97,7 +97,7 @@ func (lc *LoadCommand) Perform( var mergedConf *repr.NodeSummary if lc.Group { for _, stream := range mergedStreams { - probe := cfg.LookupHostByTime(stream.Samples[0].Hostname, stream.Samples[0].Timestamp) + probe := cdp.LookupMergedHostByTime(stream.Samples[0].Hostname, stream.Samples[0].Timestamp) if probe == nil { continue } @@ -140,7 +140,7 @@ func (lc *LoadCommand) Perform( ts := stream.Samples[0].Timestamp conf := mergedConf if conf == nil { - conf = cfg.LookupHostByTime(hn, ts) + conf = cdp.LookupMergedHostByTime(hn, ts) } rs := generateReport(stream.Samples, time.Now().Unix(), conf) if queryNeg != nil { diff --git a/code/sonalyze/data/config/config.go b/code/sonalyze/data/config/config.go index fc1c0975..83919fd3 100644 --- a/code/sonalyze/data/config/config.go +++ b/code/sonalyze/data/config/config.go @@ -4,12 +4,14 @@ package config import ( "fmt" + "iter" "math" "slices" "sync" "sync/atomic" "time" + "go-utils/hostglob" umaps "go-utils/maps" . "sonalyze/common" @@ -90,13 +92,89 @@ func MaybeOpenConfigDataProvider(meta types.Context) *ConfigDataProvider { } } +// This can return nil. It may be very expensive. We expand the host set and aggregate the data +// from all the nodes in the set. To defray the expense, we cache. The cache can grow large and +// may need to be purged. See end. +func (cdp *ConfigDataProvider) LookupMergedHostByTime(host Ustr, t int64) *repr.NodeSummary { + perCluster := cdp.data + perCluster.hostTableLock.Lock() + info := perCluster.multihostTable[host.String()] + perCluster.hostTableLock.Unlock() + if info != nil { + return info + } + + var result *repr.NodeSummary + var count int + for hn := range expandMergedHostname(host.String()) { + x := cdp.LookupSingleHostByTime(hn, t) + if x == nil { + continue + } + if result == nil { + result = &repr.NodeSummary{} + *result = *x + } else { + if x.Timestamp > result.Timestamp { + result.Timestamp = x.Timestamp + } + result.CrossNodeJobs = result.CrossNodeJobs || x.CrossNodeJobs + result.CpuCores += x.CpuCores + result.MemGB += x.MemGB + result.GpuCards += x.GpuCards + result.GpuMemGB += x.GpuMemGB + result.GpuMemPct = result.GpuMemPct || x.GpuMemPct + } + count++ + } + if count > 1 { + result.Hostname = host.String() + result.Description = "merged data" + } + + perCluster.hostTableLock.Lock() + // Might have appeared while we were busy + if info := perCluster.multihostTable[host.String()]; info != nil { + result = info + } else if result != nil { + perCluster.multihostTable[host.String()] = result + } + perCluster.hostTableLock.Unlock() + + return result +} + +// Error handling is not satisfactory, nor is the half-iterator, half-eager-expansion, but this is +// an abstraction that will disappear with a different representation of multi-hostnames. +func expandMergedHostname(s string) iter.Seq[Ustr] { + patterns, err := hostglob.SplitMultiPattern(s) + if err != nil { + return func(yield func(Ustr) bool) { + return + } + } + return func(yield func(Ustr) bool) { + for _, p := range patterns { + ss, err := hostglob.ExpandPattern(p) + if err != nil { + continue + } + for _, hn := range ss { + if !yield(StringToUstr(hn)) { + return + } + } + } + } +} + // This can return nil. We want the latest host information at or before the given time, which is // seconds since Unix epoch UTC. If the database has to be queried, the query window into the past // may be limited to 14 days. The result is not necessarily stable, it may change if new data come // in, but will never revert to older data. New data that replace a prior non-nil result may or may // not be honored in a timely manner. A static cluster configuration, should it exist, will be // consulted only if the information can't be found in the database. -func (cdp *ConfigDataProvider) LookupHostByTime(host Ustr, t int64) *repr.NodeSummary { +func (cdp *ConfigDataProvider) LookupSingleHostByTime(host Ustr, t int64) *repr.NodeSummary { if !cdp.valid { return nil } @@ -381,15 +459,17 @@ func (cdp *ConfigDataProvider) materialize(qa QueryArgs) ([]*NodeConfig, error) // per-host-info : always-sorted timestamp-unique list of records + metadata type perClusterInfo struct { - name string - hostTableLock sync.Mutex - hostTable map[string]*hostInfo + name string + hostTableLock sync.Mutex + hostTable map[string]*hostInfo + multihostTable map[string]*repr.NodeSummary } func makePerClusterInfo(name string) *perClusterInfo { return &perClusterInfo{ - name: name, - hostTable: make(map[string]*hostInfo), + name: name, + hostTable: make(map[string]*hostInfo), + multihostTable: make(map[string]*repr.NodeSummary), } } diff --git a/code/tests/sonalyzed/simple.sh b/code/tests/sonalyzed/simple.sh index 8b8d9f14..6741d5c3 100755 --- a/code/tests/sonalyzed/simple.sh +++ b/code/tests/sonalyzed/simple.sh @@ -38,17 +38,17 @@ sleep 1 # First, try to insert some data and verify that the data have been added as expected -curl --fail-with-body --data-binary @cluster1-samples.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \ - http://$testapi/api/v1/insert/sample +curl --silent --fail-with-body --data-binary @cluster1-samples.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \ + http://$testapi/api/v1/insert/sample > /dev/null -curl --fail-with-body --data-binary @cluster1-sysinfo.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \ - http://$testapi/api/v1/insert/sysinfo +curl --silent --fail-with-body --data-binary @cluster1-sysinfo.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \ + http://$testapi/api/v1/insert/sysinfo > /dev/null -curl --fail-with-body --data-binary @cluster2-samples.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \ - http://$testapi/api/v1/insert/sample +curl --silent --fail-with-body --data-binary @cluster2-samples.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \ + http://$testapi/api/v1/insert/sample > /dev/null -curl --fail-with-body --data-binary @cluster2-sysinfo.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \ - http://$testapi/api/v1/insert/sysinfo +curl --silent --fail-with-body --data-binary @cluster2-sysinfo.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \ + http://$testapi/api/v1/insert/sysinfo > /dev/null sleep 1