Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions code/go-utils/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ func NewClusterConfig(
}
}

// Finds the node configuration closest in time to the time stamp. Returns nil if not found.
func (cc *ClusterConfig) LookupHostByTime(hostname string, timestamp string) *NodeConfigRecord {
panic("No support for the time dimension of config yet")
}

// Returns the most recent node configuration for the node. Returns nil if not found.
func (cc *ClusterConfig) LookupHost(hostname string) *NodeConfigRecord {
if probe, found := cc.nodes[hostname]; found {
Expand Down
4 changes: 2 additions & 2 deletions code/sonalyze/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
//
// Over time this may become more complicated, as the config becomes time-dependent.
func EnsureConfigForInputStreams(
cfg *config.ConfigDataProvider,
cdp *config.ConfigDataProvider,
streams sample.InputStreamSet,
reason string,
) (sample.InputStreamSet, error) {
Expand All @@ -23,7 +23,7 @@ func EnsureConfigForInputStreams(
for key, stream := range streams {
hn := (*stream)[0].Hostname
ts := (*stream)[0].Timestamp
if cfg.LookupHostByTime(hn, ts) == nil {
if cdp.LookupSingleHostByTime(hn, ts) == nil {
bad[key] = true
Log.Infof("Warning: Missing host configuration for %s", hn.String())
}
Expand Down
22 changes: 11 additions & 11 deletions code/sonalyze/cmd/jobs/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,17 @@ func (jc *JobsCommand) Perform(
Log.Infof("Samples retained after filtering: %d", numSamples)
}

cfg := config.MaybeOpenConfigDataProvider(meta)
cdp := config.MaybeOpenConfigDataProvider(meta)

if NeedsConfig(jobsFormatters, jc.PrintFields) {
var err error
streams, err = EnsureConfigForInputStreams(cfg, streams, "relative format arguments")
streams, err = EnsureConfigForInputStreams(cdp, streams, "relative format arguments")
if err != nil {
return err
}
}

summaries := jc.summarizeAndFilterJobs(meta, cfg, streams, bounds)
summaries := jc.summarizeAndFilterJobs(meta, cdp, streams, bounds)
if Verbose {
Log.Infof("Jobs after aggregation filtering: %d", len(summaries))
}
Expand All @@ -187,7 +187,7 @@ func (jc *JobsCommand) Perform(

func (jc *JobsCommand) summarizeAndFilterJobs(
meta types.Context,
cfg *config.ConfigDataProvider,
cdp *config.ConfigDataProvider,
streams sample.InputStreamSet,
bounds Timebounds,
) []*jobSummary {
Expand All @@ -210,7 +210,7 @@ func (jc *JobsCommand) summarizeAndFilterJobs(
needZombie: jc.Zombie,
}
summaries, discarded, fb :=
jc.summarizeJobsFromSonarData(cfg, bounds, jobs, summaryFilter, fb)
jc.summarizeJobsFromSonarData(cdp, bounds, jobs, summaryFilter, fb)
if Verbose {
Log.Infof("Jobs discarded by aggregation filtering: %d", discarded)
}
Expand All @@ -228,7 +228,7 @@ func (jc *JobsCommand) summarizeAndFilterJobs(
}

func (jc *JobsCommand) summarizeJobsFromSonarData(
cfg *config.ConfigDataProvider,
cdp *config.ConfigDataProvider,
bounds Timebounds,
jobs sample.MergedJobs,
summaryFilter *aggregationFilter,
Expand Down Expand Up @@ -256,7 +256,7 @@ func (jc *JobsCommand) summarizeJobsFromSonarData(
discarded := 0
for _, job := range jobs {
if uint(len(job.Samples)) >= minSamples {
summary := summarizeSingleJobFromSonarData(cfg, bounds, job, now, fb)
summary := summarizeSingleJobFromSonarData(cdp, bounds, job, now, fb)
if summaryFilter == nil || summaryFilter.apply(summary) {
summaries = append(summaries, summary)
} else {
Expand All @@ -271,7 +271,7 @@ func (jc *JobsCommand) summarizeJobsFromSonarData(

// Aggregate and summarize but do not attach any sacct data.
func summarizeSingleJobFromSonarData(
cfg *config.ConfigDataProvider,
cdp *config.ConfigDataProvider,
bounds Timebounds,
job sample.MergedJob,
now int64,
Expand All @@ -284,7 +284,7 @@ func summarizeSingleJobFromSonarData(
first := samples[0].Timestamp
last := samples[len(samples)-1].Timestamp
duration := last - first
aggregate := aggregateSingleJobFromSonarData(cfg, host, samples, fb)
aggregate := aggregateSingleJobFromSonarData(cdp, host, samples, fb)
aggregate.u64[uDurationSec] = uint64(duration)
usesGpu := !aggregate.Gpus.IsEmpty()
flags := 0
Expand Down Expand Up @@ -357,7 +357,7 @@ func summarizeSingleJobFromSonarData(
// duplicated timestamps, return a JobAggregate for the job, with values that are computed from all
// log entries.
func aggregateSingleJobFromSonarData(
cfg *config.ConfigDataProvider,
cdp *config.ConfigDataProvider,
host Ustr,
job []sample.Sample,
fb flagBag,
Expand Down Expand Up @@ -417,7 +417,7 @@ func aggregateSingleJobFromSonarData(
}
usesGpu := !gpus.IsEmpty()

if sys := cfg.LookupHostByTime(host, job[0].Timestamp); sys != nil {
if sys := cdp.LookupMergedHostByTime(host, job[0].Timestamp); sys != nil {
// Quantities can be zero in surprising ways, so always guard divisions
if cores := float64(sys.CpuCores); cores > 0 {
rCpuPctAvg = cpuPctAvg / cores
Expand Down
8 changes: 4 additions & 4 deletions code/sonalyze/cmd/load/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func (lc *LoadCommand) Perform(
}

fromIncl, toIncl := lc.InterpretFromToWithBounds(bounds)
cfg := config.MaybeOpenConfigDataProvider(meta)
cdp := config.MaybeOpenConfigDataProvider(meta)

if NeedsConfig(loadFormatters, lc.PrintFields) {
var err error
streams, err = EnsureConfigForInputStreams(cfg, streams, "relative format arguments")
streams, err = EnsureConfigForInputStreams(cdp, streams, "relative format arguments")
if err != nil {
return err
}
Expand Down Expand Up @@ -97,7 +97,7 @@ func (lc *LoadCommand) Perform(
var mergedConf *repr.NodeSummary
if lc.Group {
for _, stream := range mergedStreams {
probe := cfg.LookupHostByTime(stream.Samples[0].Hostname, stream.Samples[0].Timestamp)
probe := cdp.LookupMergedHostByTime(stream.Samples[0].Hostname, stream.Samples[0].Timestamp)
if probe == nil {
continue
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (lc *LoadCommand) Perform(
ts := stream.Samples[0].Timestamp
conf := mergedConf
if conf == nil {
conf = cfg.LookupHostByTime(hn, ts)
conf = cdp.LookupMergedHostByTime(hn, ts)
}
rs := generateReport(stream.Samples, time.Now().Unix(), conf)
if queryNeg != nil {
Expand Down
92 changes: 86 additions & 6 deletions code/sonalyze/data/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ package config

import (
"fmt"
"iter"
"math"
"slices"
"sync"
"sync/atomic"
"time"

"go-utils/hostglob"
umaps "go-utils/maps"

. "sonalyze/common"
Expand Down Expand Up @@ -90,13 +92,89 @@ func MaybeOpenConfigDataProvider(meta types.Context) *ConfigDataProvider {
}
}

// This can return nil. It may be very expensive. We expand the host set and aggregate the data
// from all the nodes in the set. To defray the expense, we cache. The cache can grow large and
// may need to be purged. See end.
func (cdp *ConfigDataProvider) LookupMergedHostByTime(host Ustr, t int64) *repr.NodeSummary {
perCluster := cdp.data
perCluster.hostTableLock.Lock()
info := perCluster.multihostTable[host.String()]
perCluster.hostTableLock.Unlock()
if info != nil {
return info
}

var result *repr.NodeSummary
var count int
for hn := range expandMergedHostname(host.String()) {
x := cdp.LookupSingleHostByTime(hn, t)
if x == nil {
continue
}
if result == nil {
result = &repr.NodeSummary{}
*result = *x
} else {
if x.Timestamp > result.Timestamp {
result.Timestamp = x.Timestamp
}
result.CrossNodeJobs = result.CrossNodeJobs || x.CrossNodeJobs
result.CpuCores += x.CpuCores
result.MemGB += x.MemGB
result.GpuCards += x.GpuCards
result.GpuMemGB += x.GpuMemGB
result.GpuMemPct = result.GpuMemPct || x.GpuMemPct
}
count++
}
if count > 1 {
result.Hostname = host.String()
result.Description = "merged data"
}

perCluster.hostTableLock.Lock()
// Might have appeared while we were busy
if info := perCluster.multihostTable[host.String()]; info != nil {
result = info
} else if result != nil {
perCluster.multihostTable[host.String()] = result
}
perCluster.hostTableLock.Unlock()

return result
}

// Error handling is not satisfactory, nor is the half-iterator, half-eager-expansion, but this is
// an abstraction that will disappear with a different representation of multi-hostnames.
func expandMergedHostname(s string) iter.Seq[Ustr] {
patterns, err := hostglob.SplitMultiPattern(s)
if err != nil {
return func(yield func(Ustr) bool) {
return
}
}
return func(yield func(Ustr) bool) {
for _, p := range patterns {
ss, err := hostglob.ExpandPattern(p)
if err != nil {
continue
}
for _, hn := range ss {
if !yield(StringToUstr(hn)) {
return
}
}
}
}
}

// This can return nil. We want the latest host information at or before the given time, which is
// seconds since Unix epoch UTC. If the database has to be queried, the query window into the past
// may be limited to 14 days. The result is not necessarily stable, it may change if new data come
// in, but will never revert to older data. New data that replace a prior non-nil result may or may
// not be honored in a timely manner. A static cluster configuration, should it exist, will be
// consulted only if the information can't be found in the database.
func (cdp *ConfigDataProvider) LookupHostByTime(host Ustr, t int64) *repr.NodeSummary {
func (cdp *ConfigDataProvider) LookupSingleHostByTime(host Ustr, t int64) *repr.NodeSummary {
if !cdp.valid {
return nil
}
Expand Down Expand Up @@ -381,15 +459,17 @@ func (cdp *ConfigDataProvider) materialize(qa QueryArgs) ([]*NodeConfig, error)
// per-host-info : always-sorted timestamp-unique list of records + metadata

type perClusterInfo struct {
name string
hostTableLock sync.Mutex
hostTable map[string]*hostInfo
name string
hostTableLock sync.Mutex
hostTable map[string]*hostInfo
multihostTable map[string]*repr.NodeSummary
}

func makePerClusterInfo(name string) *perClusterInfo {
return &perClusterInfo{
name: name,
hostTable: make(map[string]*hostInfo),
name: name,
hostTable: make(map[string]*hostInfo),
multihostTable: make(map[string]*repr.NodeSummary),
}
}

Expand Down
16 changes: 8 additions & 8 deletions code/tests/sonalyzed/simple.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ sleep 1

# First, try to insert some data and verify that the data have been added as expected

curl --fail-with-body --data-binary @cluster1-samples.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \
http://$testapi/api/v1/insert/sample
curl --silent --fail-with-body --data-binary @cluster1-samples.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \
http://$testapi/api/v1/insert/sample > /dev/null

curl --fail-with-body --data-binary @cluster1-sysinfo.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \
http://$testapi/api/v1/insert/sysinfo
curl --silent --fail-with-body --data-binary @cluster1-sysinfo.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \
http://$testapi/api/v1/insert/sysinfo > /dev/null

curl --fail-with-body --data-binary @cluster2-samples.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \
http://$testapi/api/v1/insert/sample
curl --silent --fail-with-body --data-binary @cluster2-samples.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \
http://$testapi/api/v1/insert/sample > /dev/null

curl --fail-with-body --data-binary @cluster2-sysinfo.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \
http://$testapi/api/v1/insert/sysinfo
curl --silent --fail-with-body --data-binary @cluster2-sysinfo.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \
http://$testapi/api/v1/insert/sysinfo > /dev/null

sleep 1

Expand Down
Loading