Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion grafana/rmf-app/pkg/plugin/cache/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ func FrameKey(r *dds.Request, wide bool) []byte {
}

func (fc *FrameCache) Get(r *dds.Request, wide bool) *data.Frame {
logger := log.Logger.With("func", "GetFrame")
logger := log.Logger.With("func", "FrameCache.Get")
defer log.LogAndRecover(logger)

var frame data.Frame
key := FrameKey(r, wide)
buf := fc.cache.GetBig(nil, key)
if buf != nil {
// FIXME
// Sometimes it causes panic: "runtime error: index out of range [21] with length 21" and similar
// It's a Grafana SDK bug.
err := json.Unmarshal(buf, &frame)
if err != nil {
logger.Error("Unmarshal error", "err", err, "key", key)
Expand Down
59 changes: 52 additions & 7 deletions grafana/rmf-app/pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"golang.org/x/sync/singleflight"

"github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/cache"
"github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/dds"
"github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/frame"
"github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/log"
)

Expand Down Expand Up @@ -181,6 +184,14 @@ func (ds *RMFDatasource) CallResource(ctx context.Context, req *backend.CallReso
}
}

type RequestParams struct {
Resource struct {
Value string `json:"value"`
} `json:"selectedResource"`
AbsoluteTime bool `json:"absoluteTimeSelected"`
VisType string `json:"selectedVisualisationType"`
}

// QueryData handles multiple queries and returns multiple responses.
// req contains the queries []DataQuery (where each query contains RefID as a unique identifier).
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
Expand Down Expand Up @@ -234,10 +245,30 @@ func (ds *RMFDatasource) QueryData(ctx context.Context, req *backend.QueryDataRe
if params.VisType == TimeSeriesType {
// Initialize time series stream
step := getStep(mintime, q.Interval)
f, err := ds.getFirstTSFrame(&params, q.TimeRange, step)
fields := frame.SeriesFields{}
start := q.TimeRange.From
r := dds.NewRequest(params.Resource.Value, start, start, step)
f, jump, err := ds.getCachedTSFrames(r, q.TimeRange.To, step, fields)
if f == nil || err != nil {
f = frame.TaggedFrame(q.TimeRange.From, "No data yet...")
}
channel := live.Channel{
Scope: live.ScopeDatasource,
Namespace: ds.uid,
Path: uuid.NewString(),
}
cachedChannel := cache.Channel{
Resource: params.Resource.Value,
TimeRange: backend.TimeRange{From: start.Add(jump), To: q.TimeRange.To},
Absolute: params.AbsoluteTime,
Step: step,
Fields: fields,
}
err = ds.channelCache.Set(channel.Path, &cachedChannel)
if err != nil {
response = &backend.DataResponse{Status: backend.StatusInternal, Error: err}
} else {
f.SetMeta(&data.FrameMeta{Channel: channel.String()})
response = &backend.DataResponse{Frames: data.Frames{f}}
}
} else {
Expand Down Expand Up @@ -287,26 +318,40 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe
logger.Error("unable to find channel", "err", err)
return nil
}
res := c.Resource
step := c.Step
absolute := c.Absolute
from := c.TimeRange.From
to := c.TimeRange.To
fields := c.Fields

logger.Debug("starting streaming", "step", step.String(), "path", req.Path)
r := dds.NewRequest(c.Resource, from, from, step)
r := dds.NewRequest(res, from, from, step)

// Stream historical part of time series
stopTime := to
stop := to
for {
if !absolute {
stopTime = time.Now().Add(-SdsDelay)
stop = time.Now().Add(-SdsDelay)
}
if r.TimeRange.To.After(stopTime) {
if r.TimeRange.To.After(stop) {
logger.Debug("finished with historical data", "request", r.String(), "path", req.Path)
break
}
if err := ds.serveNextTSFrame(ctx, sender, fields, r, true); err != nil {
f, jump, err := ds.getCachedTSFrames(r, stop, step, fields)
if err != nil {
logger.Info("streaming stopped", "reason", err, "path", req.Path)
return nil
}
if f != nil {
if err := sender.SendFrame(f, data.IncludeAll); err != nil {
logger.Info("streaming stopped", "reason", err, "path", req.Path)
return nil
}
r.Add(jump)
continue
}
if err := ds.serveTSFrame(ctx, sender, fields, r, true); err != nil {
logger.Info("streaming stopped", "reason", err, "path", req.Path)
return nil
}
Expand All @@ -315,7 +360,7 @@ func (ds *RMFDatasource) RunStream(ctx context.Context, req *backend.RunStreamRe
if !absolute {
// Stream live data as it's being collected
for {
if err := ds.serveNextTSFrame(ctx, sender, fields, r, false); err != nil {
if err := ds.serveTSFrame(ctx, sender, fields, r, false); err != nil {
logger.Info("streaming stopped", "reason", err, "path", req.Path)
return nil
}
Expand Down
125 changes: 37 additions & 88 deletions grafana/rmf-app/pkg/plugin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,21 @@ import (
"github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/dds"
"github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/frame"
"github.com/IBM/RMF/grafana/rmf-app/pkg/plugin/log"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
)

func (ds *RMFDatasource) getFrame(r *dds.Request, wide bool) (*data.Frame, error) {
ddsResponse, err := ds.ddsClient.GetByRequest(r)
if err != nil {
return nil, err
}
headers := ds.ddsClient.GetCachedHeaders()
f, err := frame.Build(ddsResponse, headers, wide)
if err != nil {
return nil, err
}
return f, nil
}

func (ds *RMFDatasource) getFrameCached(r *dds.Request, wide bool) (*data.Frame, error) {
logger := log.Logger.With("func", "getFrameCached")
key := cache.FrameKey(r, wide)

result, err, _ := ds.single.Do(string(key), func() (interface{}, error) {
f := ds.frameCache.Get(r, wide)
// Fetch from the DDS Server and then save to cache if required.
if f == nil {
f, err := ds.getFrame(r, wide)
if err != nil {
return nil, err
} else {
// Probably the requested mintime is not ready yet, don't cache it
// We still can use it in non-timeseries views
t, ok := f.Fields[0].At(0).(time.Time)
if !ok || t.Before(r.TimeRange.To) {
return f, nil
}
if err = ds.frameCache.Set(f, r, wide); err != nil {
return nil, err
}
}
return f, nil
} else {
logger.Debug("cached value exists", "key", key)
logger.Warn("cached value exists", "key", r.String())
ddsResponse, err := ds.ddsClient.GetByRequest(r)
if err != nil {
return nil, err
}
headers := ds.ddsClient.GetCachedHeaders()
f, err := frame.Build(ddsResponse, headers, wide)
if err != nil {
return nil, err
}
return f, nil
})
Expand All @@ -95,60 +65,34 @@ func getStep(mintime time.Duration, limit time.Duration) time.Duration {
return step
}

type RequestParams struct {
Resource struct {
Value string `json:"value"`
} `json:"selectedResource"`
AbsoluteTime bool `json:"absoluteTimeSelected"`
VisType string `json:"selectedVisualisationType"`
}

func (ds *RMFDatasource) getFirstTSFrame(params *RequestParams, tr backend.TimeRange, step time.Duration) (*data.Frame, error) {
// getCachedTSFrames fetches all sequentional time series frames from cache and merges it into one frame.
// It has to syncronize time series field set passed in `fields`.
func (ds *RMFDatasource) getCachedTSFrames(r *dds.Request, stop time.Time, step time.Duration, fields frame.SeriesFields) (*data.Frame, time.Duration, error) {
var (
f *data.Frame
err error
f *data.Frame
jump time.Duration
err error
)
res := params.Resource.Value
absolute := params.AbsoluteTime

r := dds.NewRequest(res, tr.From, tr.From, step)
fields := frame.SeriesFields{}
for !absolute && r.TimeRange.To.Before(time.Now().Add(-SdsDelay)) || absolute && r.TimeRange.To.Before(tr.To) {
next := ds.frameCache.Get(r, true)
// Create a copy of the original request - don't alter it
cr := dds.NewRequest(r.Resource, r.TimeRange.From, r.TimeRange.To, step)
for r.TimeRange.To.Before(stop) {
next := ds.frameCache.Get(cr, true)
if next == nil {
break
}
frame.SyncFieldNames(fields, next, r.TimeRange.To)
f, err = frame.MergeInto(f, next)
if err != nil {
return nil, err
return nil, jump, err
}
r.Add(step)
cr.Add(step)
jump += step
}
if f == nil {
f = frame.TaggedFrame(tr.From, "No data yet...")
}
channel := live.Channel{
Scope: live.ScopeDatasource,
Namespace: ds.uid,
Path: uuid.NewString(),
}
cachedChannel := cache.Channel{
Resource: res,
TimeRange: backend.TimeRange{From: r.TimeRange.From, To: tr.To},
Absolute: absolute,
Step: step,
Fields: fields,
}
if err = ds.channelCache.Set(channel.Path, &cachedChannel); err != nil {
return nil, err
}
f.SetMeta(&data.FrameMeta{Channel: channel.String()})
return f, nil
return f, jump, err
}

func (ds *RMFDatasource) serveNextTSFrame(ctx context.Context, sender *backend.StreamSender, fields frame.SeriesFields, r *dds.Request, hist bool) error {
logger := log.Logger.With("func", "queryNextTSFrame")
func (ds *RMFDatasource) serveTSFrame(ctx context.Context, sender *backend.StreamSender, fields frame.SeriesFields, r *dds.Request, hist bool) error {
logger := log.Logger.With("func", "serveTSFrame")
var f *data.Frame
var err error

Expand All @@ -158,20 +102,25 @@ func (ds *RMFDatasource) serveNextTSFrame(ctx context.Context, sender *backend.S
}
if !hist {
d := time.Until(r.TimeRange.To.Add(SdsDelay))
logger.Debug("sleeping", "request", r.String(), "duration", d.String())
time.Sleep(d)
}
logger.Debug("executing query", "request", r.String())
f, err = ds.getFrameCached(r, true)
f, err = ds.getFrame(r, true)
if err != nil {
logger.Error("failed to get data", "request", r.String(), "reason", err)
f = frame.NoDataFrame(r.TimeRange.To)
}
if !hist {
t, ok := f.Fields[0].At(0).(time.Time)
if !ok || t.Before(r.TimeRange.To) {
logger.Debug("mintime is not ready yet")
time.Sleep(SdsDelay)
continue
} else {
if !hist {
t, ok := f.Fields[0].At(0).(time.Time)
if !ok || t.Before(r.TimeRange.To) {
logger.Debug("mintime is not ready yet")
time.Sleep(SdsDelay)
continue
}
}
if err := ds.frameCache.Set(f, r, true); err != nil {
logger.Error("failed to save data in cache", "request", r.String(), "reason", err)
}
}
break
Expand Down
Loading