diff --git a/grafana/rmf-app/.golangci.yml b/grafana/rmf-app/.golangci.yml index 18201ee5..18d44a27 100644 --- a/grafana/rmf-app/.golangci.yml +++ b/grafana/rmf-app/.golangci.yml @@ -1,4 +1,4 @@ -version: "2" +version: '2' run: allow-parallel-runners: true linters: @@ -11,7 +11,6 @@ linters: - durationcheck - errchkjson - errorlint - - exhaustive - fatcontext - gocheckcompilerdirectives - gochecksumtype diff --git a/grafana/rmf-app/BUILD.md b/grafana/rmf-app/BUILD.md index 63c6d061..811c1179 100644 --- a/grafana/rmf-app/BUILD.md +++ b/grafana/rmf-app/BUILD.md @@ -2,9 +2,9 @@ ## Pre-requisites -- `NodeJS`: >=16 -- `Yarn`: 1.x.x -- `Go`: >=1.21 +- `NodeJS`: see `package.json` +- `Yarn`: see `package.json` +- `Go`: see `go.mod` - `GNU Make`: >=3.81 - `jq`: >=v1.6 - `zip`: >=3.0 @@ -16,4 +16,4 @@ - Navigate to the directory `grafana/rmf-app`. - Execute the command: `make all`. This creates the `./build` directory and once successful you can find the - `ibm-rmf-grafana-.zip` and `ibm-rmf-grafana-.zip.md5` files there. + `ibm-rmf-grafana-.zip` and `ibm-rmf-grafana-.zip.md5` files there. diff --git a/grafana/rmf-app/makefile b/grafana/rmf-app/makefile index 48dc5e7d..2cc2fcb2 100644 --- a/grafana/rmf-app/makefile +++ b/grafana/rmf-app/makefile @@ -16,7 +16,7 @@ all: clean build zip ## Executes clean build and zip tasks ##@ Dependencies node_modules: package.json yarn.lock - yarn install --frozen-lockfile + yarn install --immutable deps-frontend: node_modules ## Install Node.js dependencies diff --git a/grafana/rmf-app/package.json b/grafana/rmf-app/package.json index dc62ac66..27503125 100644 --- a/grafana/rmf-app/package.json +++ b/grafana/rmf-app/package.json @@ -66,7 +66,7 @@ "webpack-virtual-modules": "^0.6.2" }, "engines": { - "node": ">=20" + "node": ">=22" }, "dependencies": { "@emotion/css": "^11.13.5", diff --git a/grafana/rmf-app/pkg/plugin/cache/channel.go b/grafana/rmf-app/pkg/plugin/cache/channel.go new file mode 100644 index 00000000..776b3d7c --- /dev/null +++ b/grafana/rmf-app/pkg/plugin/cache/channel.go @@ -0,0 +1,63 @@ +/** +* (C) Copyright IBM Corp. 2023, 2025. +* (C) Copyright Rocket Software, Inc. 2023-2025. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package cache + +import ( + "encoding/json" + "time" + + "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/frame" + "github.com/VictoriaMetrics/fastcache" + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +type ChannelCache struct { + cache *fastcache.Cache +} + +type Channel struct { + Resource string + TimeRange backend.TimeRange + Absolute bool + Step time.Duration + Fields frame.SeriesFields +} + +func NewChannelCache(size int) *ChannelCache { + return &ChannelCache{cache: fastcache.New(size * 1024 * 1024)} +} + +func (cc *ChannelCache) Reset() { + cc.cache.Reset() +} + +func (cc *ChannelCache) Get(path string) (*Channel, error) { + var c Channel + b := cc.cache.Get(nil, []byte(path)) + err := json.Unmarshal(b, &c) + return &c, err +} + +func (cc *ChannelCache) Set(path string, c *Channel) error { + b, err := json.Marshal(*c) + if err != nil { + return err + } + cc.cache.Set([]byte(path), b) + return nil +} diff --git a/grafana/rmf-app/pkg/plugin/cache/cache.go b/grafana/rmf-app/pkg/plugin/cache/frame.go similarity index 78% rename from grafana/rmf-app/pkg/plugin/cache/cache.go rename to grafana/rmf-app/pkg/plugin/cache/frame.go index ea16bc72..ea903afa 100644 --- a/grafana/rmf-app/pkg/plugin/cache/cache.go +++ b/grafana/rmf-app/pkg/plugin/cache/frame.go @@ -28,19 +28,19 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" ) -type Cache struct { +type FrameCache struct { cache *fastcache.Cache } -func NewFrameCache(size int) *Cache { - return &Cache{cache: fastcache.New(size * 1024 * 1024)} +func NewFrameCache(size int) *FrameCache { + return &FrameCache{cache: fastcache.New(size * 1024 * 1024)} } -func (fc *Cache) Reset() { +func (fc *FrameCache) Reset() { fc.cache.Reset() } -func Key(r *dds.Request, wide bool) []byte { +func FrameKey(r *dds.Request, wide bool) []byte { format := "long" if wide { format = "wide" @@ -48,10 +48,10 @@ func Key(r *dds.Request, wide bool) []byte { return []byte(fmt.Sprintf("%s[%s]@%d-%d", r.Resource, format, r.TimeRange.From.UnixMilli(), r.TimeRange.To.UnixMilli())) } -func (fc *Cache) GetFrame(r *dds.Request, wide bool) *data.Frame { +func (fc *FrameCache) Get(r *dds.Request, wide bool) *data.Frame { logger := log.Logger.With("func", "GetFrame") var frame data.Frame - key := Key(r, wide) + key := FrameKey(r, wide) buf := fc.cache.GetBig(nil, key) if buf != nil { err := json.Unmarshal(buf, &frame) @@ -65,9 +65,9 @@ func (fc *Cache) GetFrame(r *dds.Request, wide bool) *data.Frame { return nil } -func (fc *Cache) SaveFrame(f *data.Frame, r *dds.Request, wide bool) error { - key := Key(r, wide) - frame := fc.GetFrame(r, wide) +func (fc *FrameCache) Set(f *data.Frame, r *dds.Request, wide bool) error { + key := FrameKey(r, wide) + frame := fc.Get(r, wide) if frame != nil { return nil } diff --git a/grafana/rmf-app/pkg/plugin/channel.go b/grafana/rmf-app/pkg/plugin/channel.go deleted file mode 100644 index 4c6d9280..00000000 --- a/grafana/rmf-app/pkg/plugin/channel.go +++ /dev/null @@ -1,82 +0,0 @@ -/** -* (C) Copyright IBM Corp. 2023, 2025. -* (C) Copyright Rocket Software, Inc. 2023-2025. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. - */ - -package plugin - -import ( - "encoding/base64" - "errors" - "strconv" - "strings" - "time" - - "github.com/google/uuid" -) - -const Sep = ":" - -func encodeChannelPath(res string, from time.Time, to time.Time, absolute bool, interval time.Duration) string { - absFlag := "0" - if absolute { - absFlag = "1" - } - path := res + - Sep + strconv.FormatInt(from.Unix(), 10) + - Sep + strconv.FormatInt(to.Unix(), 10) + - Sep + absFlag + - Sep + strconv.FormatInt(int64(interval.Seconds()), 10) + - Sep + uuid.NewString()[:8] - return base64.StdEncoding.EncodeToString([]byte(path)) -} - -func decodeChannelPath(b string) (string, time.Time, time.Time, bool, time.Duration, error) { - var ( - res string - from time.Time - to time.Time - absolute bool - interval time.Duration - ) - path, err := base64.StdEncoding.DecodeString(b) - if err != nil { - return res, from, to, absolute, interval, err - } - parts := strings.Split(string(path), Sep) - if len(parts) != 6 { - return res, from, to, absolute, interval, errors.New("invalid number of elements") - } - res = parts[0] - if timestamp, err := strconv.ParseInt(parts[1], 10, 64); err != nil { - return res, from, to, absolute, interval, err - } else { - from = time.Unix(timestamp, 0) - } - if timestamp, err := strconv.ParseInt(parts[2], 10, 64); err != nil { - return res, from, to, absolute, interval, err - } else { - to = time.Unix(timestamp, 0) - } - if parts[3] == "1" { - absolute = true - } - if d, err := strconv.ParseInt(parts[4], 10, 64); err != nil { - return res, from, to, absolute, interval, err - } else { - interval = time.Duration(d) * time.Second - } - return res, from, to, absolute, interval, nil -} diff --git a/grafana/rmf-app/pkg/plugin/config.go b/grafana/rmf-app/pkg/plugin/config.go index 40c5c6eb..5ab25b57 100644 --- a/grafana/rmf-app/pkg/plugin/config.go +++ b/grafana/rmf-app/pkg/plugin/config.go @@ -90,7 +90,7 @@ func (ds *RMFDatasource) getConfig(settings backend.DataSourceInstanceSettings) } } if config.CacheSize, err = strconv.Atoi(config.JSON.CacheSizeRaw); err != nil { - logger.Warn("cache size is not valid, applying default", "cacheSize", config.JSON.CacheSizeRaw, "err", err) + logger.Warn("cache size is not valid, applying default", "cacheSizeRaw", config.JSON.CacheSizeRaw) config.CacheSize = DefaultCacheSizeMB } if config.CacheSize < MinimalCacheSizeMB { diff --git a/grafana/rmf-app/pkg/plugin/datasource.go b/grafana/rmf-app/pkg/plugin/datasource.go index e13f8920..790ed575 100644 --- a/grafana/rmf-app/pkg/plugin/datasource.go +++ b/grafana/rmf-app/pkg/plugin/datasource.go @@ -30,12 +30,10 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana-plugin-sdk-go/live" "golang.org/x/sync/singleflight" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/cache" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/dds" - "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/frame" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/log" ) @@ -51,15 +49,17 @@ var ( _ backend.StreamHandler = (*RMFDatasource)(nil) ) +const ChannelCacheSizeMB = 64 const SdsDelay = 5 * time.Second const TimeSeriesType = "TimeSeries" type RMFDatasource struct { - uid string - name string - cache *cache.Cache - ddsClient *dds.Client - single singleflight.Group + uid string + name string + channelCache *cache.ChannelCache + frameCache *cache.FrameCache + ddsClient *dds.Client + single singleflight.Group } // NewRMFDatasource creates a new instance of the RMF datasource. @@ -74,7 +74,8 @@ func NewRMFDatasource(ctx context.Context, settings backend.DataSourceInstanceSe // nolint:contextcheck ds.ddsClient = dds.NewClient(config.URL, config.Username, config.Password, config.Timeout, config.JSON.TlsSkipVerify, config.JSON.DisableCompression) - ds.cache = cache.NewFrameCache(config.CacheSize) + ds.channelCache = cache.NewChannelCache(ChannelCacheSizeMB) + ds.frameCache = cache.NewFrameCache(config.CacheSize) logger.Info("initialized a datasource", "uid", settings.UID, "name", settings.Name, "url", config.URL, "timeout", config.Timeout, "cacheSize", config.CacheSize, @@ -89,7 +90,8 @@ func (ds *RMFDatasource) Dispose() { logger := log.Logger.With("func", "Dispose") // Recover from any panic so as to not bring down this backend datasource defer log.LogAndRecover(logger) - ds.cache.Reset() + ds.channelCache.Reset() + ds.frameCache.Reset() ds.ddsClient.Close() logger.Info("disposed datasource", "uid", ds.uid, "name", ds.name) } @@ -179,14 +181,6 @@ func (ds *RMFDatasource) CallResource(ctx context.Context, req *backend.CallReso } } -type RequestParams struct { - Resource struct { - Value string `json:"value"` - } `json:"selectedResource"` - AbsoluteTime bool `json:"absoluteTimeSelected"` - VisType string `json:"selectedVisualisationType"` -} - // QueryData handles multiple queries and returns multiple responses. // req contains the queries []DataQuery (where each query contains RefID as a unique identifier). // The QueryDataResponse contains a map of RefID to the response for each query, and each response @@ -224,7 +218,7 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe for _, query := range req.Queries { wg.Add(1) - go func(q backend.DataQuery) { + go func(q *backend.DataQuery) { defer wg.Done() var response *backend.DataResponse @@ -233,29 +227,28 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe if err != nil { response = &backend.DataResponse{Status: backend.StatusBadRequest, Error: err} + } else if params.Resource.Value == "" { + response = &backend.DataResponse{Status: backend.StatusOK} } else { mintime := ds.ddsClient.GetCachedMintime() if params.VisType == TimeSeriesType { // Initialize time series stream - from := q.TimeRange.From - f := frame.TaggedFrame(from, "No data yet...") - path := encodeChannelPath(params.Resource.Value, from, q.TimeRange.To, params.AbsoluteTime, q.Interval) - channel := live.Channel{ - Scope: live.ScopeDatasource, - Namespace: req.PluginContext.DataSourceInstanceSettings.UID, - Path: path, + step := getStep(mintime, q.Interval) + f, err := ds.getFirstTSFrame(¶ms, q.TimeRange, step) + if err != nil { + response = &backend.DataResponse{Status: backend.StatusInternal, Error: err} + } else { + response = &backend.DataResponse{Frames: data.Frames{f}} } - f.SetMeta(&data.FrameMeta{Channel: channel.String()}) - response = &backend.DataResponse{Frames: data.Frames{f}} } else { // Query non-timeseries data r := dds.NewRequest(params.Resource.Value, q.TimeRange.From, q.TimeRange.To, mintime) response = &backend.DataResponse{} // FIXME: doesn't it need to be cached? - if newFrame, err := ds.getFrame(ctx, r, false); err != nil { - // nolint:errorlint - if cause, ok := errors.Unwrap(err).(*dds.Message); ok { - response.Error = cause + if newFrame, err := ds.getFrame(r, false); err != nil { + var msg *dds.Message + if errors.As(err, &msg) { + response.Error = err response.Status = backend.StatusBadRequest } else { response.Error = log.FrameErrorWithId(logger, err) @@ -267,7 +260,7 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe } } responseChan <- ResponseWithId{refId: q.RefID, response: response} - }(query) + }(&query) } @@ -288,52 +281,32 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe // Recover from any panic so as to not bring down this backend datasource defer log.LogAndRecover(logger) - res, from, to, absolute, interval, err := decodeChannelPath(string(req.Path)) + // res, from, to, absolute, interval, err := decodeChannelPath(string(req.Path)) + c, err := ds.channelCache.Get(req.Path) if err != nil { - logger.Error("unable to decode channel path", "err", err) + logger.Error("unable to find channel", "err", err) return nil } + step := c.Step + absolute := c.Absolute + from := c.TimeRange.From + to := c.TimeRange.To + fields := c.Fields - // Calculate the most appropriate interval length, i.e. time series step. - // There's no ideal solution. We assume that it aligns with one hour. - // If it doesn't, streaming will still work, but some queries will miss cache. - mintime := ds.ddsClient.GetCachedMintime() - n := 3600 / int(mintime.Seconds()) - step := time.Hour // The maximum possible - for i := 1; i <= n; i++ { - if n%i == 0 && time.Duration(i)*mintime >= interval { - step = time.Duration(i) * mintime - break - } - } - logger.Debug("starting streaming", "step", step.String(), "interval", interval.String(), "path", req.Path) - - r := dds.NewRequest(res, from, from, step) - seriesFields := frame.SeriesFields{} + logger.Debug("starting streaming", "step", step.String(), "path", req.Path) + r := dds.NewRequest(c.Resource, from, from, step) // Stream historical part of time series + stopTime := to for { - if err := ctx.Err(); err != nil { - logger.Info("streaming stopped", "reason", err, "path", req.Path) - return nil + if !absolute { + stopTime = time.Now().Add(-SdsDelay) } - if !absolute && r.TimeRange.To.After(time.Now().Add(-SdsDelay)) || absolute && r.TimeRange.To.After(to) { + if r.TimeRange.To.After(stopTime) { logger.Debug("finished with historical data", "request", r.String(), "path", req.Path) break } - logger.Debug("executing query", "request", r.String()) - f, err := ds.getFrameCached(ctx, r, true) - if err != nil { - logger.Error("failed to get data", "request", r.String(), "reason", err, "path", req.Path) - f = frame.NoDataFrame(r.TimeRange.To) - } - // No data was returned by DDS yet by this and any previous request - if len(f.Fields) < 2 && len(seriesFields) == 0 { - r.Add(step) - continue - } - frame.SyncFieldNames(seriesFields, f, r.TimeRange.To) - if err := sender.SendFrame(f, data.IncludeAll); err != nil { + if err := ds.serveNextTSFrame(ctx, sender, fields, r, true); err != nil { logger.Info("streaming stopped", "reason", err, "path", req.Path) return nil } @@ -342,40 +315,13 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe if !absolute { // Stream live data as it's being collected for { - if err := ctx.Err(); err != nil { - logger.Info("streaming stopped", "reason", err, "path", req.Path) - return nil - } - d := time.Until(r.TimeRange.To.Add(SdsDelay)) - logger.Debug("waiting for the next mintime", "duration", d.String(), "path", req.Path) - time.Sleep(d) - - f, err := ds.getFrameCached(ctx, r, true) - if err != nil { - logger.Error("failed to get data", "request", r.String(), "reason", err, "path", req.Path) - f = frame.NoDataFrame(r.TimeRange.To) - } - - t, ok := f.Fields[0].At(0).(time.Time) - if !ok || t.Before(r.TimeRange.To) { - logger.Debug("mintime is not ready yet", "path", req.Path) - time.Sleep(SdsDelay) - continue - } - // No data was returned by DDS yet by any previous request - if len(f.Fields) < 2 && len(seriesFields) == 0 { - r.Add(step) - continue - } - frame.SyncFieldNames(seriesFields, f, r.TimeRange.To) - if err := sender.SendFrame(f, data.IncludeAll); err != nil { + if err := ds.serveNextTSFrame(ctx, sender, fields, r, false); err != nil { logger.Info("streaming stopped", "reason", err, "path", req.Path) return nil } r.Add(step) } - } - if len(seriesFields) == 0 { + } else if len(fields) == 0 { // There is no data at all, send a dummy frame without fields to reflect it in UI f := data.NewFrame("") if err := sender.SendFrame(f, data.IncludeAll); err != nil { @@ -387,54 +333,6 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe return nil } -func (ds *RMFDatasource) getFrame(ctx context.Context, r *dds.Request, wide bool) (*data.Frame, error) { - ddsResponse, err := ds.ddsClient.GetByRequest(ctx, r) - if err != nil { - return nil, err - } - headers := ds.ddsClient.GetCachedHeaders() - f, err := frame.Build(ddsResponse, headers, wide) - if err != nil { - return nil, err - } - return f, nil -} - -func (ds *RMFDatasource) getFrameCached(ctx context.Context, r *dds.Request, wide bool) (*data.Frame, error) { - logger := log.Logger.With("func", "getFrameCached") - key := cache.Key(r, wide) - - result, err, _ := ds.single.Do(string(key), func() (interface{}, error) { - f := ds.cache.GetFrame(r, wide) - // Fetch from the DDS Server and then save to cache if required. - if f == nil { - f, err := ds.getFrame(ctx, r, wide) - if err != nil { - return nil, err - } else { - // Probably the requested mintime is not ready yet, don't cache it - // We still can use it in non-timeseries views - t, ok := f.Fields[0].At(0).(time.Time) - if !ok || t.Before(r.TimeRange.To) { - return f, nil - } - if err = ds.cache.SaveFrame(f, r, wide); err != nil { - return nil, err - } - } - return f, nil - } else { - logger.Debug("cached value exists", "key", key) - } - return f, nil - }) - if result != nil { - return result.(*data.Frame), err - } else { - return nil, err - } -} - // SubscribeStream is called when a client wants to connect to a stream. This callback // allows sending the first message. func (ds *RMFDatasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { diff --git a/grafana/rmf-app/pkg/plugin/dds/client.go b/grafana/rmf-app/pkg/plugin/dds/client.go index f2c7833c..62a66d79 100644 --- a/grafana/rmf-app/pkg/plugin/dds/client.go +++ b/grafana/rmf-app/pkg/plugin/dds/client.go @@ -36,6 +36,7 @@ import ( const UpdateInterval = 15 * time.Minute const DefaultTimeOffset = 0 +const DefaultMinTime = 100 const IndexPath = "/gpm/index.xml" const RootPath = "/gpm/root.xml" @@ -119,7 +120,7 @@ func (c *Client) Close() { }) } -func (c *Client) GetByRequest(ctx context.Context, r *Request) (*Response, error) { +func (c *Client) GetByRequest(r *Request) (*Response, error) { path, params, err := r.pathWithParams(c.GetCachedTimeOffset()) if err != nil { return nil, err @@ -211,10 +212,12 @@ func (c *Client) updateTimeData() *TimeData { response, err := c.Get(PerformPath, "resource", ",,SYSPLEX", "id", "8D0D50") if err != nil { logger.Error("unable to fetch DDS time data", "error", err) + return nil, err } timeData := response.Reports[0].TimeData if timeData == nil { logger.Error("unable to fetch DDS time data", "error", "no time data in DDS response") + return nil, err } c.rwMutex.Lock() c.timeData = timeData @@ -230,8 +233,9 @@ func (c *Client) updateTimeData() *TimeData { func (c *Client) GetCachedMintime() time.Duration { timeData := c.ensureTimeData() - if timeData != nil { - return time.Duration(c.timeData.MinTime.Value) * time.Second + minTime := DefaultMinTime + if timeData != nil && timeData.MinTime.Value != 0 { + minTime = timeData.MinTime.Value } - return 0 + return time.Duration(minTime) * time.Second } diff --git a/grafana/rmf-app/pkg/plugin/dds/request.go b/grafana/rmf-app/pkg/plugin/dds/request.go index 19288613..c5106831 100644 --- a/grafana/rmf-app/pkg/plugin/dds/request.go +++ b/grafana/rmf-app/pkg/plugin/dds/request.go @@ -39,7 +39,10 @@ func NewRequest(res string, from time.Time, to time.Time, step time.Duration) *R func (r *Request) Align(d time.Duration) { r.TimeRange.From = r.TimeRange.From.Truncate(d) - r.TimeRange.To = r.TimeRange.To.Truncate(d).Add(d) + t := r.TimeRange.To.Truncate(d) + if t.Equal(r.TimeRange.From) || t.Before(r.TimeRange.To) { + r.TimeRange.To = t.Add(d) + } } func (r *Request) Add(d time.Duration) { diff --git a/grafana/rmf-app/pkg/plugin/frame/frame.go b/grafana/rmf-app/pkg/plugin/frame/frame.go index 7dbbe1b7..f85268d7 100644 --- a/grafana/rmf-app/pkg/plugin/frame/frame.go +++ b/grafana/rmf-app/pkg/plugin/frame/frame.go @@ -150,6 +150,7 @@ func buildLongForMetric(report *dds.Report, frameName string) *data.Frame { // iterateMetricRows parses metric key-value pairs and passes them to `process` while iterating over rows. func iterateMetricRows(report *dds.Report, defaultName string, process func(name string, value *float64)) { + colMap := map[string]bool{} var sb strings.Builder for _, jsonRow := range report.Rows { cols := jsonRow.Cols @@ -171,7 +172,12 @@ func iterateMetricRows(report *dds.Report, defaultName string, process func(name if sb.Len() == 0 { sb.WriteString(defaultName) } - process(sb.String(), parseFloat(rawValue)) + colName := sb.String() + if _, ok := colMap[colName]; ok { + continue + } + colMap[colName] = true + process(colName, parseFloat(rawValue)) } } diff --git a/grafana/rmf-app/pkg/plugin/frame/utils.go b/grafana/rmf-app/pkg/plugin/frame/utils.go index fd764904..1e0a14fd 100644 --- a/grafana/rmf-app/pkg/plugin/frame/utils.go +++ b/grafana/rmf-app/pkg/plugin/frame/utils.go @@ -18,6 +18,7 @@ package frame import ( + "errors" "strings" "time" @@ -74,3 +75,45 @@ func RemoveOldFieldNames(fieldMap SeriesFields, cutoffTime time.Time) { } } } + +func MergeInto(dst *data.Frame, src *data.Frame) (*data.Frame, error) { + if dst == nil { + dst = data.NewFrame(src.Name) + } + if src == nil { + return dst, nil + } + dstLen, err := dst.RowLen() + if err != nil { + return nil, err + } + srcLen, err := src.RowLen() + if err != nil { + return nil, err + } + for _, field2 := range src.Fields { + field1, _ := dst.FieldByName(field2.Name) + if field1 == nil { + switch field2.Type() { + case data.FieldTypeTime: + field1 = data.NewField(field2.Name, field2.Labels, make([]time.Time, dstLen)) + case data.FieldTypeNullableFloat64: + field1 = data.NewField(field2.Name, field2.Labels, make([]*float64, dstLen)) + default: + return nil, errors.New("unsupported field type") + } + dst.Fields = append(dst.Fields, field1) + } + for i := range srcLen { + field1.Append(field2.At(i)) + } + } + for _, field1 := range dst.Fields { + if field2, _ := src.FieldByName(field1.Name); field2 == nil { + for range srcLen { + field1.Append(nil) + } + } + } + return dst, nil +} diff --git a/grafana/rmf-app/pkg/plugin/query.go b/grafana/rmf-app/pkg/plugin/query.go new file mode 100644 index 00000000..b264f3e6 --- /dev/null +++ b/grafana/rmf-app/pkg/plugin/query.go @@ -0,0 +1,188 @@ +/** +* (C) Copyright IBM Corp. 2023, 2025. +* (C) Copyright Rocket Software, Inc. 2023-2025. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package plugin + +import ( + "context" + "time" + + "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/cache" + "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/dds" + "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/frame" + "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/log" + "github.com/google/uuid" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana-plugin-sdk-go/live" +) + +func (ds *RMFDatasource) getFrame(r *dds.Request, wide bool) (*data.Frame, error) { + ddsResponse, err := ds.ddsClient.GetByRequest(r) + if err != nil { + return nil, err + } + headers := ds.ddsClient.GetCachedHeaders() + f, err := frame.Build(ddsResponse, headers, wide) + if err != nil { + return nil, err + } + return f, nil +} + +func (ds *RMFDatasource) getFrameCached(r *dds.Request, wide bool) (*data.Frame, error) { + logger := log.Logger.With("func", "getFrameCached") + key := cache.FrameKey(r, wide) + + result, err, _ := ds.single.Do(string(key), func() (interface{}, error) { + f := ds.frameCache.Get(r, wide) + // Fetch from the DDS Server and then save to cache if required. + if f == nil { + f, err := ds.getFrame(r, wide) + if err != nil { + return nil, err + } else { + // Probably the requested mintime is not ready yet, don't cache it + // We still can use it in non-timeseries views + t, ok := f.Fields[0].At(0).(time.Time) + if !ok || t.Before(r.TimeRange.To) { + return f, nil + } + if err = ds.frameCache.Set(f, r, wide); err != nil { + return nil, err + } + } + return f, nil + } else { + logger.Debug("cached value exists", "key", key) + logger.Warn("cached value exists", "key", r.String()) + } + return f, nil + }) + if result != nil { + return result.(*data.Frame), err + } else { + return nil, err + } +} + +// getStep calculates the most appropriate time series step. +// There's no ideal solution. We assume that it aligns with one hour. +// If it doesn't, streaming will still work, but some queries will miss cache. +func getStep(mintime time.Duration, limit time.Duration) time.Duration { + n := 3600 / int(mintime.Seconds()) + step := time.Hour // The maximum possible + for i := 1; i <= n; i++ { + if n%i == 0 && time.Duration(i)*mintime >= limit { + step = time.Duration(i) * mintime + break + } + } + return step +} + +type RequestParams struct { + Resource struct { + Value string `json:"value"` + } `json:"selectedResource"` + AbsoluteTime bool `json:"absoluteTimeSelected"` + VisType string `json:"selectedVisualisationType"` +} + +func (ds *RMFDatasource) getFirstTSFrame(params *RequestParams, tr backend.TimeRange, step time.Duration) (*data.Frame, error) { + var ( + f *data.Frame + err error + ) + res := params.Resource.Value + absolute := params.AbsoluteTime + + r := dds.NewRequest(res, tr.From, tr.From, step) + fields := frame.SeriesFields{} + for !absolute && r.TimeRange.To.Before(time.Now().Add(-SdsDelay)) || absolute && r.TimeRange.To.Before(tr.To) { + next := ds.frameCache.Get(r, true) + if next == nil { + break + } + frame.SyncFieldNames(fields, next, r.TimeRange.To) + f, err = frame.MergeInto(f, next) + if err != nil { + return nil, err + } + r.Add(step) + } + if f == nil { + f = frame.TaggedFrame(tr.From, "No data yet...") + } + channel := live.Channel{ + Scope: live.ScopeDatasource, + Namespace: ds.uid, + Path: uuid.NewString(), + } + cachedChannel := cache.Channel{ + Resource: res, + TimeRange: backend.TimeRange{From: r.TimeRange.From, To: tr.To}, + Absolute: absolute, + Step: step, + Fields: fields, + } + if err = ds.channelCache.Set(channel.Path, &cachedChannel); err != nil { + return nil, err + } + f.SetMeta(&data.FrameMeta{Channel: channel.String()}) + return f, nil +} + +func (ds *RMFDatasource) serveNextTSFrame(ctx context.Context, sender *backend.StreamSender, fields frame.SeriesFields, r *dds.Request, hist bool) error { + logger := log.Logger.With("func", "queryNextTSFrame") + var f *data.Frame + var err error + + for { + if err = ctx.Err(); err != nil { + return err + } + if !hist { + d := time.Until(r.TimeRange.To.Add(SdsDelay)) + time.Sleep(d) + } + logger.Debug("executing query", "request", r.String()) + f, err = ds.getFrameCached(r, true) + if err != nil { + logger.Error("failed to get data", "request", r.String(), "reason", err) + f = frame.NoDataFrame(r.TimeRange.To) + } + if !hist { + t, ok := f.Fields[0].At(0).(time.Time) + if !ok || t.Before(r.TimeRange.To) { + logger.Debug("mintime is not ready yet") + time.Sleep(SdsDelay) + continue + } + } + break + } + // No data was returned by DDS yet by any previous request + if len(f.Fields) < 2 && len(fields) == 0 { + return nil + } + frame.SyncFieldNames(fields, f, r.TimeRange.To) + if err := sender.SendFrame(f, data.IncludeAll); err != nil { + return err + } + return nil +}