diff --git a/grafana/rmf-app/pkg/plugin/cache/frame.go b/grafana/rmf-app/pkg/plugin/cache/frame.go index ea903afa..2d25af8b 100644 --- a/grafana/rmf-app/pkg/plugin/cache/frame.go +++ b/grafana/rmf-app/pkg/plugin/cache/frame.go @@ -49,11 +49,16 @@ func FrameKey(r *dds.Request, wide bool) []byte { } func (fc *FrameCache) Get(r *dds.Request, wide bool) *data.Frame { - logger := log.Logger.With("func", "GetFrame") + logger := log.Logger.With("func", "FrameCache.Get") + defer log.LogAndRecover(logger) + var frame data.Frame key := FrameKey(r, wide) buf := fc.cache.GetBig(nil, key) if buf != nil { + // FIXME + // Sometimes it causes panic: "runtime error: index out of range [21] with length 21" and similar + // It's a Grafana SDK bug. err := json.Unmarshal(buf, &frame) if err != nil { logger.Error("Unmarshal error", "err", err, "key", key) diff --git a/grafana/rmf-app/pkg/plugin/datasource.go b/grafana/rmf-app/pkg/plugin/datasource.go index 790ed575..3cb980dc 100644 --- a/grafana/rmf-app/pkg/plugin/datasource.go +++ b/grafana/rmf-app/pkg/plugin/datasource.go @@ -27,13 +27,16 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana-plugin-sdk-go/live" "golang.org/x/sync/singleflight" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/cache" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/dds" + "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/frame" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/log" ) @@ -181,6 +184,14 @@ func (ds *RMFDatasource) CallResource(ctx context.Context, req *backend.CallReso } } +type RequestParams struct { + Resource struct { + Value string `json:"value"` + } `json:"selectedResource"` + AbsoluteTime bool `json:"absoluteTimeSelected"` + VisType string `json:"selectedVisualisationType"` +} + // QueryData handles multiple queries and returns multiple responses. // req contains the queries []DataQuery (where each query contains RefID as a unique identifier). // The QueryDataResponse contains a map of RefID to the response for each query, and each response @@ -234,10 +245,30 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe if params.VisType == TimeSeriesType { // Initialize time series stream step := getStep(mintime, q.Interval) - f, err := ds.getFirstTSFrame(¶ms, q.TimeRange, step) + fields := frame.SeriesFields{} + start := q.TimeRange.From + r := dds.NewRequest(params.Resource.Value, start, start, step) + f, jump, err := ds.getCachedTSFrames(r, q.TimeRange.To, step, fields) + if f == nil || err != nil { + f = frame.TaggedFrame(q.TimeRange.From, "No data yet...") + } + channel := live.Channel{ + Scope: live.ScopeDatasource, + Namespace: ds.uid, + Path: uuid.NewString(), + } + cachedChannel := cache.Channel{ + Resource: params.Resource.Value, + TimeRange: backend.TimeRange{From: start.Add(jump), To: q.TimeRange.To}, + Absolute: params.AbsoluteTime, + Step: step, + Fields: fields, + } + err = ds.channelCache.Set(channel.Path, &cachedChannel) if err != nil { response = &backend.DataResponse{Status: backend.StatusInternal, Error: err} } else { + f.SetMeta(&data.FrameMeta{Channel: channel.String()}) response = &backend.DataResponse{Frames: data.Frames{f}} } } else { @@ -287,6 +318,7 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe logger.Error("unable to find channel", "err", err) return nil } + res := c.Resource step := c.Step absolute := c.Absolute from := c.TimeRange.From @@ -294,19 +326,32 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe fields := c.Fields logger.Debug("starting streaming", "step", step.String(), "path", req.Path) - r := dds.NewRequest(c.Resource, from, from, step) + r := dds.NewRequest(res, from, from, step) // Stream historical part of time series - stopTime := to + stop := to for { if !absolute { - stopTime = time.Now().Add(-SdsDelay) + stop = time.Now().Add(-SdsDelay) } - if r.TimeRange.To.After(stopTime) { + if r.TimeRange.To.After(stop) { logger.Debug("finished with historical data", "request", r.String(), "path", req.Path) break } - if err := ds.serveNextTSFrame(ctx, sender, fields, r, true); err != nil { + f, jump, err := ds.getCachedTSFrames(r, stop, step, fields) + if err != nil { + logger.Info("streaming stopped", "reason", err, "path", req.Path) + return nil + } + if f != nil { + if err := sender.SendFrame(f, data.IncludeAll); err != nil { + logger.Info("streaming stopped", "reason", err, "path", req.Path) + return nil + } + r.Add(jump) + continue + } + if err := ds.serveTSFrame(ctx, sender, fields, r, true); err != nil { logger.Info("streaming stopped", "reason", err, "path", req.Path) return nil } @@ -315,7 +360,7 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe if !absolute { // Stream live data as it's being collected for { - if err := ds.serveNextTSFrame(ctx, sender, fields, r, false); err != nil { + if err := ds.serveTSFrame(ctx, sender, fields, r, false); err != nil { logger.Info("streaming stopped", "reason", err, "path", req.Path) return nil } diff --git a/grafana/rmf-app/pkg/plugin/query.go b/grafana/rmf-app/pkg/plugin/query.go index b264f3e6..49e79f21 100644 --- a/grafana/rmf-app/pkg/plugin/query.go +++ b/grafana/rmf-app/pkg/plugin/query.go @@ -25,51 +25,21 @@ import ( "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/dds" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/frame" "github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/log" - "github.com/google/uuid" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana-plugin-sdk-go/live" ) func (ds *RMFDatasource) getFrame(r *dds.Request, wide bool) (*data.Frame, error) { - ddsResponse, err := ds.ddsClient.GetByRequest(r) - if err != nil { - return nil, err - } - headers := ds.ddsClient.GetCachedHeaders() - f, err := frame.Build(ddsResponse, headers, wide) - if err != nil { - return nil, err - } - return f, nil -} - -func (ds *RMFDatasource) getFrameCached(r *dds.Request, wide bool) (*data.Frame, error) { - logger := log.Logger.With("func", "getFrameCached") key := cache.FrameKey(r, wide) - result, err, _ := ds.single.Do(string(key), func() (interface{}, error) { - f := ds.frameCache.Get(r, wide) - // Fetch from the DDS Server and then save to cache if required. - if f == nil { - f, err := ds.getFrame(r, wide) - if err != nil { - return nil, err - } else { - // Probably the requested mintime is not ready yet, don't cache it - // We still can use it in non-timeseries views - t, ok := f.Fields[0].At(0).(time.Time) - if !ok || t.Before(r.TimeRange.To) { - return f, nil - } - if err = ds.frameCache.Set(f, r, wide); err != nil { - return nil, err - } - } - return f, nil - } else { - logger.Debug("cached value exists", "key", key) - logger.Warn("cached value exists", "key", r.String()) + ddsResponse, err := ds.ddsClient.GetByRequest(r) + if err != nil { + return nil, err + } + headers := ds.ddsClient.GetCachedHeaders() + f, err := frame.Build(ddsResponse, headers, wide) + if err != nil { + return nil, err } return f, nil }) @@ -95,60 +65,34 @@ func getStep(mintime time.Duration, limit time.Duration) time.Duration { return step } -type RequestParams struct { - Resource struct { - Value string `json:"value"` - } `json:"selectedResource"` - AbsoluteTime bool `json:"absoluteTimeSelected"` - VisType string `json:"selectedVisualisationType"` -} - -func (ds *RMFDatasource) getFirstTSFrame(params *RequestParams, tr backend.TimeRange, step time.Duration) (*data.Frame, error) { +// getCachedTSFrames fetches all sequentional time series frames from cache and merges it into one frame. +// It has to syncronize time series field set passed in `fields`. +func (ds *RMFDatasource) getCachedTSFrames(r *dds.Request, stop time.Time, step time.Duration, fields frame.SeriesFields) (*data.Frame, time.Duration, error) { var ( - f *data.Frame - err error + f *data.Frame + jump time.Duration + err error ) - res := params.Resource.Value - absolute := params.AbsoluteTime - - r := dds.NewRequest(res, tr.From, tr.From, step) - fields := frame.SeriesFields{} - for !absolute && r.TimeRange.To.Before(time.Now().Add(-SdsDelay)) || absolute && r.TimeRange.To.Before(tr.To) { - next := ds.frameCache.Get(r, true) + // Create a copy of the original request - don't alter it + cr := dds.NewRequest(r.Resource, r.TimeRange.From, r.TimeRange.To, step) + for r.TimeRange.To.Before(stop) { + next := ds.frameCache.Get(cr, true) if next == nil { break } frame.SyncFieldNames(fields, next, r.TimeRange.To) f, err = frame.MergeInto(f, next) if err != nil { - return nil, err + return nil, jump, err } - r.Add(step) + cr.Add(step) + jump += step } - if f == nil { - f = frame.TaggedFrame(tr.From, "No data yet...") - } - channel := live.Channel{ - Scope: live.ScopeDatasource, - Namespace: ds.uid, - Path: uuid.NewString(), - } - cachedChannel := cache.Channel{ - Resource: res, - TimeRange: backend.TimeRange{From: r.TimeRange.From, To: tr.To}, - Absolute: absolute, - Step: step, - Fields: fields, - } - if err = ds.channelCache.Set(channel.Path, &cachedChannel); err != nil { - return nil, err - } - f.SetMeta(&data.FrameMeta{Channel: channel.String()}) - return f, nil + return f, jump, err } -func (ds *RMFDatasource) serveNextTSFrame(ctx context.Context, sender *backend.StreamSender, fields frame.SeriesFields, r *dds.Request, hist bool) error { - logger := log.Logger.With("func", "queryNextTSFrame") +func (ds *RMFDatasource) serveTSFrame(ctx context.Context, sender *backend.StreamSender, fields frame.SeriesFields, r *dds.Request, hist bool) error { + logger := log.Logger.With("func", "serveTSFrame") var f *data.Frame var err error @@ -158,20 +102,25 @@ func (ds *RMFDatasource) serveNextTSFrame(ctx context.Context, sender *backend.S } if !hist { d := time.Until(r.TimeRange.To.Add(SdsDelay)) + logger.Debug("sleeping", "request", r.String(), "duration", d.String()) time.Sleep(d) } logger.Debug("executing query", "request", r.String()) - f, err = ds.getFrameCached(r, true) + f, err = ds.getFrame(r, true) if err != nil { logger.Error("failed to get data", "request", r.String(), "reason", err) f = frame.NoDataFrame(r.TimeRange.To) - } - if !hist { - t, ok := f.Fields[0].At(0).(time.Time) - if !ok || t.Before(r.TimeRange.To) { - logger.Debug("mintime is not ready yet") - time.Sleep(SdsDelay) - continue + } else { + if !hist { + t, ok := f.Fields[0].At(0).(time.Time) + if !ok || t.Before(r.TimeRange.To) { + logger.Debug("mintime is not ready yet") + time.Sleep(SdsDelay) + continue + } + } + if err := ds.frameCache.Set(f, r, true); err != nil { + logger.Error("failed to save data in cache", "request", r.String(), "reason", err) } } break