diff --git a/loki-api/README.md b/loki-api/README.md new file mode 100644 index 0000000..d7011e6 --- /dev/null +++ b/loki-api/README.md @@ -0,0 +1,23 @@ +# loki-api + +A Rust wrapper around Grafana Loki's Protocol Buffer definitions. + +## Overview + +This crate provides Rust bindings for the Protocol Buffer definitions used in [Grafana Loki](https://github.com/grafana/loki). It uses [prost](https://github.com/tokio-rs/prost) to generate Rust code from the Protocol Buffer definitions. + +## Structure + +- `generate/` - Contains the original Protocol Buffer definitions from Grafana Loki +- `src/` - Contains the Rust code, including: + - Generated Rust bindings for the Protocol Buffer definitions + - Re-exports of `prost` and `prost_types` for convenience + +## License + +This project is licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](../LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](../LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. \ No newline at end of file diff --git a/loki-api/generate/proto/logproto.proto b/loki-api/generate/proto/logproto.proto index 7270601..b015658 100644 --- a/loki-api/generate/proto/logproto.proto +++ b/loki-api/generate/proto/logproto.proto @@ -2,54 +2,158 @@ syntax = "proto3"; package logproto; -option go_package = "github.com/grafana/loki/pkg/logproto"; - -import "google/protobuf/timestamp.proto"; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "google/protobuf/timestamp.proto"; import "pkg/logqlmodel/stats/stats.proto"; +import "pkg/push/push.proto"; +import "pkg/storage/chunk/cache/resultscache/types.proto"; -service Pusher { - rpc Push(PushRequest) returns (PushResponse) {}; -} +option go_package = "github.com/grafana/loki/v3/pkg/logproto"; service Querier { - rpc Query(QueryRequest) returns (stream QueryResponse) {}; - rpc QuerySample(SampleQueryRequest) returns (stream SampleQueryResponse) {}; - rpc Label(LabelRequest) returns (LabelResponse) {}; - rpc Tail(TailRequest) returns (stream TailResponse) {}; - rpc Series(SeriesRequest) returns (SeriesResponse) {}; - rpc TailersCount(TailersCountRequest) returns (TailersCountResponse) {}; - rpc GetChunkIDs(GetChunkIDsRequest) returns (GetChunkIDsResponse) {}; // GetChunkIDs returns ChunkIDs from the index store holding logs for given selectors and time-range. + rpc Query(QueryRequest) returns (stream QueryResponse) {} + + rpc QuerySample(SampleQueryRequest) returns (stream SampleQueryResponse) {} + + rpc Label(LabelRequest) returns (LabelResponse) {} + + rpc Tail(TailRequest) returns (stream TailResponse) {} + + rpc Series(SeriesRequest) returns (SeriesResponse) {} + + rpc TailersCount(TailersCountRequest) returns (TailersCountResponse) {} + + rpc GetChunkIDs(GetChunkIDsRequest) returns (GetChunkIDsResponse) {} + + // Note: this MUST be the same as the variant defined in + // indexgateway.proto on the IndexGateway service. + rpc GetStats(IndexStatsRequest) returns (IndexStatsResponse) {} + // Note: this MUST be the same as the variant defined in + // indexgateway.proto on the IndexGateway service. + rpc GetVolume(VolumeRequest) returns (VolumeResponse) {} + + rpc GetDetectedFields(DetectedFieldsRequest) returns (DetectedFieldsResponse) {} + + rpc GetDetectedLabels(DetectedLabelsRequest) returns (LabelToValuesResponse) {} +} + +message LabelToValuesResponse { + map labels = 1; +} + +message UniqueLabelValues { + repeated string values = 1; +} + +service StreamData { + rpc GetStreamRates(StreamRatesRequest) returns (StreamRatesResponse) {} +} + +message StreamRatesRequest {} + +message StreamRatesResponse { + repeated StreamRate streamRates = 1; +} + +message StreamMetadata { + uint64 streamHash = 1; + uint64 entriesSize = 2; + uint64 structuredMetadataSize = 3; +} + +service IngestLimitsFrontend { + rpc ExceedsLimits(ExceedsLimitsRequest) returns (ExceedsLimitsResponse) {} +} + +message ExceedsLimitsRequest { + string tenant = 1; + repeated StreamMetadata streams = 2; +} + +message ExceedsLimitsResponse { + string tenant = 1; + repeated RejectedStream rejectedStreams = 2; } -service Ingester { - rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {}; +message RejectedStream { + uint64 streamHash = 1; + string reason = 2; } -message PushRequest { - repeated StreamAdapter streams = 1 [(gogoproto.jsontag) = "streams", (gogoproto.customtype) = "Stream"]; +service IngestLimits { + rpc GetStreamUsage(GetStreamUsageRequest) returns (GetStreamUsageResponse) {} + rpc GetAssignedPartitions(GetAssignedPartitionsRequest) returns (GetAssignedPartitionsResponse) {} } -message PushResponse { +message GetStreamUsageRequest { + string tenant = 1; + repeated int32 partitions = 2; + repeated uint64 streamHashes = 3; +} + +message GetStreamUsageResponse { + string tenant = 1; + uint64 activeStreams = 2; + int64 rate = 3; + repeated uint64 unknownStreams = 4; +} + +message StreamRate { + uint64 streamHash = 1; + uint64 streamHashNoShard = 2; + int64 rate = 3; // rate in plain bytes. + string tenant = 4; + uint32 pushes = 5; +} + +message GetAssignedPartitionsRequest {} + +message GetAssignedPartitionsResponse { + map assignedPartitions = 1; } message QueryRequest { - string selector = 1; + string selector = 1 [deprecated = true]; uint32 limit = 2; - google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp start = 3 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 4 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; Direction direction = 5; reserved 6; repeated string shards = 7 [(gogoproto.jsontag) = "shards,omitempty"]; repeated Delete deletes = 8; + Plan plan = 9 [(gogoproto.customtype) = "github.com/grafana/loki/v3/pkg/querier/plan.QueryPlan"]; + // If populated, these represent the chunk references that the querier should + // use to fetch the data, plus any other chunks reported by ingesters. + ChunkRefGroup storeChunks = 10 [(gogoproto.jsontag) = "storeChunks"]; } message SampleQueryRequest { - string selector = 1; - google.protobuf.Timestamp start = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - google.protobuf.Timestamp end = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + string selector = 1 [deprecated = true]; // mark as reserved once we've fully migrated to plan. + google.protobuf.Timestamp start = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 3 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; repeated string shards = 4 [(gogoproto.jsontag) = "shards,omitempty"]; repeated Delete deletes = 5; + Plan plan = 6 [(gogoproto.customtype) = "github.com/grafana/loki/v3/pkg/querier/plan.QueryPlan"]; + // If populated, these represent the chunk references that the querier should + // use to fetch the data, plus any other chunks reported by ingesters. + ChunkRefGroup storeChunks = 10 [(gogoproto.jsontag) = "storeChunks"]; +} + +// TODO(owen-d): fix. This will break rollouts as soon as the internal repr is changed. +message Plan { + bytes raw = 1; } message Delete { @@ -59,46 +163,46 @@ message Delete { } message QueryResponse { - repeated StreamAdapter streams = 1 [(gogoproto.customtype) = "Stream", (gogoproto.nullable) = true]; + repeated StreamAdapter streams = 1 [ + (gogoproto.customtype) = "github.com/grafana/loki/pkg/push.Stream", + (gogoproto.nullable) = true + ]; stats.Ingester stats = 2 [(gogoproto.nullable) = false]; + repeated string warnings = 3; } message SampleQueryResponse { - repeated Series series = 1 [(gogoproto.customtype) = "Series", (gogoproto.nullable) = true]; + repeated Series series = 1 [ + (gogoproto.customtype) = "Series", + (gogoproto.nullable) = true + ]; stats.Ingester stats = 2 [(gogoproto.nullable) = false]; + repeated string warnings = 3; } - enum Direction { FORWARD = 0; BACKWARD = 1; } - - message LabelRequest { string name = 1; bool values = 2; // True to fetch label values, false for fetch labels names. - google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true]; - google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = true]; + google.protobuf.Timestamp start = 3 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = true + ]; + google.protobuf.Timestamp end = 4 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = true + ]; + string query = 5; // Naming this query instead of match because this should be with queryrangebase.Request interface } message LabelResponse { repeated string values = 1; } -message StreamAdapter { - string labels = 1 [(gogoproto.jsontag) = "labels"]; - repeated EntryAdapter entries = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "entries"]; - // hash contains the original hash of the stream. - uint64 hash = 3 [(gogoproto.jsontag) = "-"]; -} - -message EntryAdapter { - google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false, (gogoproto.jsontag) = "ts"]; - string line = 2 [(gogoproto.jsontag) = "line"]; -} - message Sample { int64 timestamp = 1 [(gogoproto.jsontag) = "ts"]; double value = 2 [(gogoproto.jsontag) = "value"]; @@ -107,32 +211,45 @@ message Sample { // LegacySample exists for backwards compatibility reasons and is deprecated. Do not use. message LegacySample { - double value = 1; + double value = 1; int64 timestamp_ms = 2; } message Series { string labels = 1 [(gogoproto.jsontag) = "labels"]; - repeated Sample samples = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "samples"]; + repeated Sample samples = 2 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "samples" + ]; uint64 streamHash = 3 [(gogoproto.jsontag) = "streamHash"]; } message TailRequest { - string query = 1; + string query = 1 [deprecated = true]; reserved 2; uint32 delayFor = 3; uint32 limit = 4; - google.protobuf.Timestamp start = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp start = 5 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + Plan plan = 6 [(gogoproto.customtype) = "github.com/grafana/loki/v3/pkg/querier/plan.QueryPlan"]; } message TailResponse { - StreamAdapter stream = 1 [(gogoproto.customtype) = "Stream"]; + StreamAdapter stream = 1 [(gogoproto.customtype) = "github.com/grafana/loki/pkg/push.Stream"]; repeated DroppedStream droppedStreams = 2; } message SeriesRequest { - google.protobuf.Timestamp start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - google.protobuf.Timestamp end = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp start = 1 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; repeated string groups = 3; repeated string shards = 4 [(gogoproto.jsontag) = "shards,omitempty"]; } @@ -142,28 +259,32 @@ message SeriesResponse { } message SeriesIdentifier { - map labels = 1; + message LabelsEntry { + string key = 1; + string value = 2; + } + repeated LabelsEntry labels = 1 [(gogoproto.nullable) = false]; } message DroppedStream { - google.protobuf.Timestamp from = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp from = 1 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp to = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; string labels = 3; } -message TimeSeriesChunk { - string from_ingester_id = 1; - string user_id = 2; - repeated LabelPair labels = 3; - repeated Chunk chunks = 4; -} - message LabelPair { string name = 1; string value = 2; } // LegacyLabelPair exists for backwards compatibility reasons and is deprecated. Do not use. +// Use LabelPair instead. message LegacyLabelPair { bytes name = 1; bytes value = 2; @@ -173,13 +294,7 @@ message Chunk { bytes data = 1; } -message TransferChunksResponse { - -} - -message TailersCountRequest { - -} +message TailersCountRequest {} message TailersCountResponse { uint32 count = 1; @@ -187,10 +302,253 @@ message TailersCountResponse { message GetChunkIDsRequest { string matchers = 1; - google.protobuf.Timestamp start = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - google.protobuf.Timestamp end = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp start = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 3 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; } message GetChunkIDsResponse { repeated string chunkIDs = 1; } + +// ChunkRef contains the metadata to reference a Chunk. +// It is embedded by the Chunk type itself and used to generate the Chunk +// checksum. So it is imported to take care of the JSON representation of the +// resulting Go struct. +message ChunkRef { + uint64 fingerprint = 1 [(gogoproto.jsontag) = "fingerprint"]; + string user_id = 2 [ + (gogoproto.customname) = "UserID", + (gogoproto.jsontag) = "userID" + ]; + int64 from = 3 [ + (gogoproto.jsontag) = "from", + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 4 [ + (gogoproto.jsontag) = "through", + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + + // The checksum is not written to the external storage. We use crc32, + // Castagnoli table. See http://www.evanjones.ca/crc32c.html. + uint32 checksum = 5 [(gogoproto.jsontag) = "-"]; +} + +message ChunkRefGroup { + repeated ChunkRef refs = 1 [(gogoproto.jsontag) = "refs"]; +} + +message LabelValuesForMetricNameRequest { + string metric_name = 1; + string label_name = 2; + int64 from = 3 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 4 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + string matchers = 5; +} + +message LabelNamesForMetricNameRequest { + string metric_name = 1; + int64 from = 2 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 3 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + string matchers = 4; +} + +// TODO(owen-d): fix. This will break rollouts as soon as the internal repr is changed. +message LineFilter { + bytes raw = 1; +} + +message GetChunkRefRequest { + int64 from = 1 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 2 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + string matchers = 3; + // TODO(salvacorts): Delete this field once the weekly release is done. + repeated LineFilter filters = 4 [ + (gogoproto.customtype) = "github.com/grafana/loki/v3/pkg/logql/syntax.LineFilter", + (gogoproto.nullable) = false + ]; + Plan plan = 5 [ + (gogoproto.customtype) = "github.com/grafana/loki/v3/pkg/querier/plan.QueryPlan", + (gogoproto.nullable) = false + ]; +} + +message GetChunkRefResponse { + repeated ChunkRef refs = 1; + stats.Index stats = 2 [(gogoproto.nullable) = false]; +} + +message GetSeriesRequest { + int64 from = 1 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 2 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + string matchers = 3; +} + +message GetSeriesResponse { + repeated IndexSeries series = 1 [(gogoproto.nullable) = false]; +} + +// Series calls to the TSDB Index +message IndexSeries { + repeated LabelPair labels = 1 [ + (gogoproto.nullable) = false, + (gogoproto.customtype) = "LabelAdapter" + ]; +} + +message QueryIndexResponse { + string QueryKey = 1; + repeated Row rows = 2; +} + +message Row { + bytes rangeValue = 1; + bytes value = 2; +} + +message QueryIndexRequest { + repeated IndexQuery Queries = 1; +} + +message IndexQuery { + string tableName = 1; + string hashValue = 2; + bytes rangeValuePrefix = 3; + bytes rangeValueStart = 4; + bytes valueEqual = 5; +} + +message IndexStatsRequest { + int64 from = 1 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 2 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + string matchers = 3; + // TODO(owen-d): add shards to grpc calls so we don't have + // to extract via labels +} + +message IndexStatsResponse { + uint64 streams = 1 [(gogoproto.jsontag) = "streams"]; + uint64 chunks = 2 [(gogoproto.jsontag) = "chunks"]; + uint64 bytes = 3 [(gogoproto.jsontag) = "bytes"]; + uint64 entries = 4 [(gogoproto.jsontag) = "entries"]; +} + +message VolumeRequest { + int64 from = 1 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 2 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + string matchers = 3; + int32 limit = 4; + int64 step = 5; + repeated string targetLabels = 6; + string aggregateBy = 7; + resultscache.CachingOptions cachingOptions = 8 [(gogoproto.nullable) = false]; +} + +message VolumeResponse { + repeated Volume volumes = 1 [(gogoproto.nullable) = false]; + int32 limit = 2; +} + +message Volume { + string name = 1 [(gogoproto.jsontag) = "name"]; + uint64 volume = 3 [(gogoproto.jsontag) = "volume"]; +} + +message DetectedFieldsRequest { + google.protobuf.Timestamp start = 1 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + string query = 3; // Naming this query instead of match because this should be with queryrangebase.Request interface + uint32 lineLimit = 4; + uint32 limit = 5; + int64 step = 6; + bool values = 7; // True to fetch detected field values, false for fetch detected field label names. + string name = 8; // Name of the detected field to fetch values for. +} + +message DetectedFieldsResponse { + repeated DetectedField fields = 1 [(gogoproto.jsontag) = "fields,omitempty"]; + uint32 limit = 2 [(gogoproto.jsontag) = "limit,omitempty"]; + repeated string values = 3 [(gogoproto.jsontag) = "values,omitempty"]; +} + +// TODO: make the detected field include the serialized sketch +// we only want cardinality in the JSON response +message DetectedField { + string label = 1; + string type = 2 [(gogoproto.casttype) = "DetectedFieldType"]; + uint64 cardinality = 3; + repeated string parsers = 4 [(gogoproto.jsontag) = "parsers"]; + bytes sketch = 5 [(gogoproto.jsontag) = "sketch,omitempty"]; +} + +message DetectedLabelsRequest { + google.protobuf.Timestamp start = 1 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + string query = 3; +} + +message DetectedLabelsResponse { + repeated DetectedLabel detectedLabels = 1; +} + +message DetectedLabel { + string label = 1; + uint64 cardinality = 2; + bytes sketch = 3 [(gogoproto.jsontag) = "sketch,omitempty"]; +} diff --git a/loki-api/generate/proto/pkg/logqlmodel/stats/stats.proto b/loki-api/generate/proto/pkg/logqlmodel/stats/stats.proto index fdf3f14..fd22549 100644 --- a/loki-api/generate/proto/pkg/logqlmodel/stats/stats.proto +++ b/loki-api/generate/proto/pkg/logqlmodel/stats/stats.proto @@ -4,25 +4,76 @@ package stats; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +option go_package = "github.com/grafana/loki/v3/pkg/logqlmodel/stats"; option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; -option go_package = "github.com/grafana/loki/pkg/logqlmodel/stats"; - // Result contains LogQL query statistics. message Result { - Summary summary = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "summary"]; - Querier querier = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "querier"]; - Ingester ingester = 3 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "ingester"]; + Summary summary = 1 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "summary" + ]; + Querier querier = 2 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "querier" + ]; + Ingester ingester = 3 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "ingester" + ]; + Caches caches = 4 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "cache" + ]; + Index index = 5 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "index" + ]; +} + +message Caches { + Cache chunk = 1 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "chunk" + ]; + Cache index = 2 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "index" + ]; + Cache result = 3 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "result" + ]; + Cache statsResult = 4 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "statsResult" + ]; + Cache volumeResult = 5 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "volumeResult" + ]; + Cache seriesResult = 6 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "seriesResult" + ]; + Cache labelResult = 7 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "labelResult" + ]; + Cache instantMetricResult = 8 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "instantMetricResult" + ]; } // Summary is the summary of a query statistics. message Summary { - // Total bytes processed per second. + // Total bytes processed per second. int64 bytesProcessedPerSecond = 1 [(gogoproto.jsontag) = "bytesProcessedPerSecond"]; // Total lines processed per second. int64 linesProcessedPerSecond = 2 [(gogoproto.jsontag) = "linesProcessedPerSecond"]; - // Total bytes processed. + // Total bytes processed. Includes structured metadata bytes. int64 totalBytesProcessed = 3 [(gogoproto.jsontag) = "totalBytesProcessed"]; // Total lines processed. int64 totalLinesProcessed = 4 [(gogoproto.jsontag) = "totalLinesProcessed"]; @@ -34,12 +85,40 @@ message Summary { // In addition to internal calculations this is also returned by the HTTP API. // Grafana expects time values to be returned in seconds as float. double queueTime = 6 [(gogoproto.jsontag) = "queueTime"]; - // Total of subqueries created to fulfill this query. + // Subqueries exists for backwards compatibility reasons and is deprecated. Do not use. + // Instead use splits and shards int64 subqueries = 7 [(gogoproto.jsontag) = "subqueries"]; + // Total number of result entries returned + int64 totalEntriesReturned = 8 [(gogoproto.jsontag) = "totalEntriesReturned"]; + // Total number of splits by time + int64 splits = 9 [(gogoproto.jsontag) = "splits"]; + // Total number of shards + int64 shards = 10 [(gogoproto.jsontag) = "shards"]; + // Total lines post query filtering + int64 totalPostFilterLines = 11 [(gogoproto.jsontag) = "totalPostFilterLines"]; + // Total bytes processed of metadata. + int64 totalStructuredMetadataBytesProcessed = 12 [(gogoproto.jsontag) = "totalStructuredMetadataBytesProcessed"]; +} + +// Statistics from Index queries +// TODO(owen-d): include bytes. +// Needs some index methods added to return _sized_ chunk refs to know +message Index { + // Total chunks + int64 totalChunks = 1 [(gogoproto.jsontag) = "totalChunks"]; + // Post-filtered chunks + int64 postFilterChunks = 2 [(gogoproto.jsontag) = "postFilterChunks"]; + // Nanosecond duration spent fetching shards + int64 shardsDuration = 3 [(gogoproto.jsontag) = "shardsDuration"]; + // Indicates whether the query used blooms to filter chunks + bool usedBloomFilters = 4 [(gogoproto.jsontag) = "usedBloomFilters"]; } message Querier { - Store store = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "store"]; + Store store = 1 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "store" + ]; } message Ingester { @@ -52,27 +131,43 @@ message Ingester { // Total lines sent by ingesters. int64 totalLinesSent = 4 [(gogoproto.jsontag) = "totalLinesSent"]; - Store store = 5 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "store"]; - + Store store = 5 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "store" + ]; } message Store { - // The total of chunk reference fetched from index. - int64 totalChunksRef = 1 [(gogoproto.jsontag) = "totalChunksRef"]; - // Total number of chunks fetched. - int64 totalChunksDownloaded = 2 [(gogoproto.jsontag) = "totalChunksDownloaded"]; - // Time spent fetching chunks in nanoseconds. - int64 chunksDownloadTime = 3 [(gogoproto.jsontag) = "chunksDownloadTime"]; - - Chunk chunk = 4 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "chunk"]; + // The total of chunk reference fetched from index. + int64 totalChunksRef = 1 [(gogoproto.jsontag) = "totalChunksRef"]; + // Total number of chunks fetched. + int64 totalChunksDownloaded = 2 [(gogoproto.jsontag) = "totalChunksDownloaded"]; + // Time spent fetching chunks in nanoseconds. + int64 chunksDownloadTime = 3 [(gogoproto.jsontag) = "chunksDownloadTime"]; + // Whether the query referenced structured metadata + bool queryReferencedStructured = 13 [(gogoproto.jsontag) = "queryReferencedStructuredMetadata"]; + + Chunk chunk = 4 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "chunk" + ]; + + // Time spent fetching chunk refs from index. + int64 chunkRefsFetchTime = 5 [(gogoproto.jsontag) = "chunkRefsFetchTime"]; + + // Time spent being blocked on congestion control. + int64 congestionControlLatency = 6 [(gogoproto.jsontag) = "congestionControlLatency"]; + + // Total number of lines filtered by pipeline wrapper. + int64 pipelineWrapperFilteredLines = 7 [(gogoproto.jsontag) = "pipelineWrapperFilteredLines"]; } message Chunk { - // Total bytes processed but was already in memory. (found in the headchunk) + // Total bytes processed but was already in memory (found in the headchunk). Includes structured metadata bytes. int64 headChunkBytes = 4 [(gogoproto.jsontag) = "headChunkBytes"]; // Total lines processed but was already in memory. (found in the headchunk) int64 headChunkLines = 5 [(gogoproto.jsontag) = "headChunkLines"]; - // Total bytes decompressed and processed from chunks. + // Total bytes decompressed and processed from chunks. Includes structured metadata bytes. int64 decompressedBytes = 6 [(gogoproto.jsontag) = "decompressedBytes"]; // Total lines decompressed and processed from chunks. int64 decompressedLines = 7 [(gogoproto.jsontag) = "decompressedLines"]; @@ -80,4 +175,21 @@ message Chunk { int64 compressedBytes = 8 [(gogoproto.jsontag) = "compressedBytes"]; // Total duplicates found while processing. int64 totalDuplicates = 9 [(gogoproto.jsontag) = "totalDuplicates"]; + // Total lines post filtering + int64 postFilterLines = 10 [(gogoproto.jsontag) = "postFilterLines"]; + // Total bytes processed for metadata but was already in memory. (found in the headchunk) + int64 headChunkStructuredMetadataBytes = 11 [(gogoproto.jsontag) = "headChunkStructuredMetadataBytes"]; + // Total bytes of entries metadata decompressed and processed from chunks. + int64 decompressedStructuredMetadataBytes = 12 [(gogoproto.jsontag) = "decompressedStructuredMetadataBytes"]; } + +message Cache { + int32 entriesFound = 1 [(gogoproto.jsontag) = "entriesFound"]; + int32 entriesRequested = 2 [(gogoproto.jsontag) = "entriesRequested"]; + int32 entriesStored = 3 [(gogoproto.jsontag) = "entriesStored"]; + int64 bytesReceived = 4 [(gogoproto.jsontag) = "bytesReceived"]; + int64 bytesSent = 5 [(gogoproto.jsontag) = "bytesSent"]; + int32 requests = 6 [(gogoproto.jsontag) = "requests"]; + int64 downloadTime = 7 [(gogoproto.jsontag) = "downloadTime"]; + int64 queryLengthServed = 8 [(gogoproto.jsontag) = "queryLengthServed"]; +} \ No newline at end of file diff --git a/loki-api/generate/proto/pkg/push/push.proto b/loki-api/generate/proto/pkg/push/push.proto new file mode 100644 index 0000000..4887fb0 --- /dev/null +++ b/loki-api/generate/proto/pkg/push/push.proto @@ -0,0 +1,56 @@ +syntax = "proto3"; + +package logproto; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/grafana/loki/pkg/push"; + +service Pusher { + rpc Push(PushRequest) returns (PushResponse) {} +} + +message PushRequest { + repeated StreamAdapter streams = 1 [ + (gogoproto.jsontag) = "streams", + (gogoproto.customtype) = "Stream" + ]; +} + +message PushResponse {} + +message StreamAdapter { + string labels = 1 [(gogoproto.jsontag) = "labels"]; + repeated EntryAdapter entries = 2 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "entries" + ]; + // hash contains the original hash of the stream. + uint64 hash = 3 [(gogoproto.jsontag) = "-"]; +} + +message LabelPairAdapter { + string name = 1; + string value = 2; +} + +message EntryAdapter { + google.protobuf.Timestamp timestamp = 1 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "ts" + ]; + string line = 2 [(gogoproto.jsontag) = "line"]; + repeated LabelPairAdapter structuredMetadata = 3 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "structuredMetadata,omitempty" + ]; + // This field shouldn't be used by clients to push data to Loki. + // It is only used by Loki to return parsed log lines in query responses. + // TODO: Remove this field from the write path Proto. + repeated LabelPairAdapter parsed = 4 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "parsed,omitempty" + ]; +} diff --git a/loki-api/generate/proto/pkg/storage/chunk/cache/resultscache/types.proto b/loki-api/generate/proto/pkg/storage/chunk/cache/resultscache/types.proto new file mode 100644 index 0000000..344f8bf --- /dev/null +++ b/loki-api/generate/proto/pkg/storage/chunk/cache/resultscache/types.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package resultscache; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "google/protobuf/any.proto"; + +option go_package = "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +// Defined here to prevent circular imports between logproto & queryrangebase +message CachingOptions { + bool disabled = 1; +} + +message CachedResponse { + string key = 1 [(gogoproto.jsontag) = "key"]; + + // List of cached responses; non-overlapping and in order. + repeated Extent extents = 2 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "extents" + ]; +} + +message Extent { + int64 start = 1 [(gogoproto.jsontag) = "start"]; + int64 end = 2 [(gogoproto.jsontag) = "end"]; + // reserved the previous key to ensure cache transition + reserved 3; + string trace_id = 4 [(gogoproto.jsontag) = "-"]; + google.protobuf.Any response = 5 [(gogoproto.jsontag) = "response"]; +} diff --git a/loki-api/generate/src/main.rs b/loki-api/generate/src/main.rs index 35c0e3a..8bd3aba 100644 --- a/loki-api/generate/src/main.rs +++ b/loki-api/generate/src/main.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; fn main() -> io::Result<()> { let out_dir = PathBuf::from(env!("OUT_DIR")); fs::copy(out_dir.join("logproto.rs"), "../src/logproto.rs")?; + fs::copy(out_dir.join("resultscache.rs"), "../src/resultscache.rs")?; fs::copy(out_dir.join("stats.rs"), "../src/stats.rs")?; Ok(()) } diff --git a/loki-api/src/lib.rs b/loki-api/src/lib.rs index 347c6a5..35f54c2 100644 --- a/loki-api/src/lib.rs +++ b/loki-api/src/lib.rs @@ -3,5 +3,9 @@ pub extern crate prost_types; #[rustfmt::skip] pub mod logproto; + +#[rustfmt::skip] +pub mod resultscache; + #[rustfmt::skip] pub mod stats; diff --git a/loki-api/src/logproto.rs b/loki-api/src/logproto.rs index 8f6dca4..7e117ce 100644 --- a/loki-api/src/logproto.rs +++ b/loki-api/src/logproto.rs @@ -1,231 +1,634 @@ +// This file is @generated by prost-build. #[derive(Clone, PartialEq, ::prost::Message)] pub struct PushRequest { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub streams: ::prost::alloc::vec::Vec, } +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct PushResponse {} #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PushResponse { +pub struct StreamAdapter { + #[prost(string, tag = "1")] + pub labels: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub entries: ::prost::alloc::vec::Vec, + /// hash contains the original hash of the stream. + #[prost(uint64, tag = "3")] + pub hash: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LabelPairAdapter { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub value: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EntryAdapter { + #[prost(message, optional, tag = "1")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, tag = "2")] + pub line: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "3")] + pub structured_metadata: ::prost::alloc::vec::Vec, + /// This field shouldn't be used by clients to push data to Loki. + /// It is only used by Loki to return parsed log lines in query responses. + /// TODO: Remove this field from the write path Proto. + #[prost(message, repeated, tag = "4")] + pub parsed: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LabelToValuesResponse { + #[prost(map = "string, message", tag = "1")] + pub labels: ::std::collections::HashMap< + ::prost::alloc::string::String, + UniqueLabelValues, + >, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct UniqueLabelValues { + #[prost(string, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct StreamRatesRequest {} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct StreamRatesResponse { + #[prost(message, repeated, tag = "1")] + pub stream_rates: ::prost::alloc::vec::Vec, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct StreamMetadata { + #[prost(uint64, tag = "1")] + pub stream_hash: u64, + #[prost(uint64, tag = "2")] + pub entries_size: u64, + #[prost(uint64, tag = "3")] + pub structured_metadata_size: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExceedsLimitsRequest { + #[prost(string, tag = "1")] + pub tenant: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub streams: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExceedsLimitsResponse { + #[prost(string, tag = "1")] + pub tenant: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub rejected_streams: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RejectedStream { + #[prost(uint64, tag = "1")] + pub stream_hash: u64, + #[prost(string, tag = "2")] + pub reason: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetStreamUsageRequest { + #[prost(string, tag = "1")] + pub tenant: ::prost::alloc::string::String, + #[prost(int32, repeated, tag = "2")] + pub partitions: ::prost::alloc::vec::Vec, + #[prost(uint64, repeated, tag = "3")] + pub stream_hashes: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetStreamUsageResponse { + #[prost(string, tag = "1")] + pub tenant: ::prost::alloc::string::String, + #[prost(uint64, tag = "2")] + pub active_streams: u64, + #[prost(int64, tag = "3")] + pub rate: i64, + #[prost(uint64, repeated, tag = "4")] + pub unknown_streams: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct StreamRate { + #[prost(uint64, tag = "1")] + pub stream_hash: u64, + #[prost(uint64, tag = "2")] + pub stream_hash_no_shard: u64, + /// rate in plain bytes. + #[prost(int64, tag = "3")] + pub rate: i64, + #[prost(string, tag = "4")] + pub tenant: ::prost::alloc::string::String, + #[prost(uint32, tag = "5")] + pub pushes: u32, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetAssignedPartitionsRequest {} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetAssignedPartitionsResponse { + #[prost(map = "int32, int64", tag = "1")] + pub assigned_partitions: ::std::collections::HashMap, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct QueryRequest { - #[prost(string, tag="1")] + #[deprecated] + #[prost(string, tag = "1")] pub selector: ::prost::alloc::string::String, - #[prost(uint32, tag="2")] + #[prost(uint32, tag = "2")] pub limit: u32, - #[prost(message, optional, tag="3")] + #[prost(message, optional, tag = "3")] pub start: ::core::option::Option<::prost_types::Timestamp>, - #[prost(message, optional, tag="4")] + #[prost(message, optional, tag = "4")] pub end: ::core::option::Option<::prost_types::Timestamp>, - #[prost(enumeration="Direction", tag="5")] + #[prost(enumeration = "Direction", tag = "5")] pub direction: i32, - #[prost(string, repeated, tag="7")] + #[prost(string, repeated, tag = "7")] pub shards: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(message, repeated, tag="8")] + #[prost(message, repeated, tag = "8")] pub deletes: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "9")] + pub plan: ::core::option::Option, + /// If populated, these represent the chunk references that the querier should + /// use to fetch the data, plus any other chunks reported by ingesters. + #[prost(message, optional, tag = "10")] + pub store_chunks: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SampleQueryRequest { - #[prost(string, tag="1")] + /// mark as reserved once we've fully migrated to plan. + #[deprecated] + #[prost(string, tag = "1")] pub selector: ::prost::alloc::string::String, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub start: ::core::option::Option<::prost_types::Timestamp>, - #[prost(message, optional, tag="3")] + #[prost(message, optional, tag = "3")] pub end: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, repeated, tag="4")] + #[prost(string, repeated, tag = "4")] pub shards: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(message, repeated, tag="5")] + #[prost(message, repeated, tag = "5")] pub deletes: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "6")] + pub plan: ::core::option::Option, + /// If populated, these represent the chunk references that the querier should + /// use to fetch the data, plus any other chunks reported by ingesters. + #[prost(message, optional, tag = "10")] + pub store_chunks: ::core::option::Option, +} +/// TODO(owen-d): fix. This will break rollouts as soon as the internal repr is changed. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Plan { + #[prost(bytes = "vec", tag = "1")] + pub raw: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Delete { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub selector: ::prost::alloc::string::String, - #[prost(int64, tag="2")] + #[prost(int64, tag = "2")] pub start: i64, - #[prost(int64, tag="3")] + #[prost(int64, tag = "3")] pub end: i64, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct QueryResponse { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub streams: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub stats: ::core::option::Option, + #[prost(string, repeated, tag = "3")] + pub warnings: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SampleQueryResponse { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub series: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub stats: ::core::option::Option, + #[prost(string, repeated, tag = "3")] + pub warnings: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct LabelRequest { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, /// True to fetch label values, false for fetch labels names. - #[prost(bool, tag="2")] + #[prost(bool, tag = "2")] pub values: bool, - #[prost(message, optional, tag="3")] + #[prost(message, optional, tag = "3")] pub start: ::core::option::Option<::prost_types::Timestamp>, - #[prost(message, optional, tag="4")] + #[prost(message, optional, tag = "4")] pub end: ::core::option::Option<::prost_types::Timestamp>, + /// Naming this query instead of match because this should be with queryrangebase.Request interface + #[prost(string, tag = "5")] + pub query: ::prost::alloc::string::String, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct LabelResponse { - #[prost(string, repeated, tag="1")] + #[prost(string, repeated, tag = "1")] pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct StreamAdapter { - #[prost(string, tag="1")] - pub labels: ::prost::alloc::string::String, - #[prost(message, repeated, tag="2")] - pub entries: ::prost::alloc::vec::Vec, - /// hash contains the original hash of the stream. - #[prost(uint64, tag="3")] - pub hash: u64, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct EntryAdapter { - #[prost(message, optional, tag="1")] - pub timestamp: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, tag="2")] - pub line: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Sample { - #[prost(int64, tag="1")] + #[prost(int64, tag = "1")] pub timestamp: i64, - #[prost(double, tag="2")] + #[prost(double, tag = "2")] pub value: f64, - #[prost(uint64, tag="3")] + #[prost(uint64, tag = "3")] pub hash: u64, } /// LegacySample exists for backwards compatibility reasons and is deprecated. Do not use. -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct LegacySample { - #[prost(double, tag="1")] + #[prost(double, tag = "1")] pub value: f64, - #[prost(int64, tag="2")] + #[prost(int64, tag = "2")] pub timestamp_ms: i64, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Series { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub labels: ::prost::alloc::string::String, - #[prost(message, repeated, tag="2")] + #[prost(message, repeated, tag = "2")] pub samples: ::prost::alloc::vec::Vec, - #[prost(uint64, tag="3")] + #[prost(uint64, tag = "3")] pub stream_hash: u64, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TailRequest { - #[prost(string, tag="1")] + #[deprecated] + #[prost(string, tag = "1")] pub query: ::prost::alloc::string::String, - #[prost(uint32, tag="3")] + #[prost(uint32, tag = "3")] pub delay_for: u32, - #[prost(uint32, tag="4")] + #[prost(uint32, tag = "4")] pub limit: u32, - #[prost(message, optional, tag="5")] + #[prost(message, optional, tag = "5")] pub start: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "6")] + pub plan: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TailResponse { - #[prost(message, optional, tag="1")] + #[prost(message, optional, tag = "1")] pub stream: ::core::option::Option, - #[prost(message, repeated, tag="2")] + #[prost(message, repeated, tag = "2")] pub dropped_streams: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SeriesRequest { - #[prost(message, optional, tag="1")] + #[prost(message, optional, tag = "1")] pub start: ::core::option::Option<::prost_types::Timestamp>, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub end: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, repeated, tag="3")] + #[prost(string, repeated, tag = "3")] pub groups: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(string, repeated, tag="4")] + #[prost(string, repeated, tag = "4")] pub shards: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SeriesResponse { - #[prost(message, repeated, tag="1")] + #[prost(message, repeated, tag = "1")] pub series: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SeriesIdentifier { - #[prost(map="string, string", tag="1")] - pub labels: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + #[prost(message, repeated, tag = "1")] + pub labels: ::prost::alloc::vec::Vec, +} +/// Nested message and enum types in `SeriesIdentifier`. +pub mod series_identifier { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct LabelsEntry { + #[prost(string, tag = "1")] + pub key: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub value: ::prost::alloc::string::String, + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct DroppedStream { - #[prost(message, optional, tag="1")] + #[prost(message, optional, tag = "1")] pub from: ::core::option::Option<::prost_types::Timestamp>, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub to: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, tag="3")] + #[prost(string, tag = "3")] pub labels: ::prost::alloc::string::String, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct TimeSeriesChunk { - #[prost(string, tag="1")] - pub from_ingester_id: ::prost::alloc::string::String, - #[prost(string, tag="2")] - pub user_id: ::prost::alloc::string::String, - #[prost(message, repeated, tag="3")] - pub labels: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag="4")] - pub chunks: ::prost::alloc::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] pub struct LabelPair { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, - #[prost(string, tag="2")] + #[prost(string, tag = "2")] pub value: ::prost::alloc::string::String, } /// LegacyLabelPair exists for backwards compatibility reasons and is deprecated. Do not use. +/// Use LabelPair instead. #[derive(Clone, PartialEq, ::prost::Message)] pub struct LegacyLabelPair { - #[prost(bytes="vec", tag="1")] + #[prost(bytes = "vec", tag = "1")] pub name: ::prost::alloc::vec::Vec, - #[prost(bytes="vec", tag="2")] + #[prost(bytes = "vec", tag = "2")] pub value: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Chunk { - #[prost(bytes="vec", tag="1")] + #[prost(bytes = "vec", tag = "1")] pub data: ::prost::alloc::vec::Vec, } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct TransferChunksResponse { -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct TailersCountRequest { -} -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct TailersCountRequest {} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct TailersCountResponse { - #[prost(uint32, tag="1")] + #[prost(uint32, tag = "1")] pub count: u32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetChunkIDsRequest { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub matchers: ::prost::alloc::string::String, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub start: ::core::option::Option<::prost_types::Timestamp>, - #[prost(message, optional, tag="3")] + #[prost(message, optional, tag = "3")] pub end: ::core::option::Option<::prost_types::Timestamp>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetChunkIDsResponse { - #[prost(string, repeated, tag="1")] + #[prost(string, repeated, tag = "1")] pub chunk_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +/// ChunkRef contains the metadata to reference a Chunk. +/// It is embedded by the Chunk type itself and used to generate the Chunk +/// checksum. So it is imported to take care of the JSON representation of the +/// resulting Go struct. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChunkRef { + #[prost(uint64, tag = "1")] + pub fingerprint: u64, + #[prost(string, tag = "2")] + pub user_id: ::prost::alloc::string::String, + #[prost(int64, tag = "3")] + pub from: i64, + #[prost(int64, tag = "4")] + pub through: i64, + /// The checksum is not written to the external storage. We use crc32, + /// Castagnoli table. See + #[prost(uint32, tag = "5")] + pub checksum: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChunkRefGroup { + #[prost(message, repeated, tag = "1")] + pub refs: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LabelValuesForMetricNameRequest { + #[prost(string, tag = "1")] + pub metric_name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub label_name: ::prost::alloc::string::String, + #[prost(int64, tag = "3")] + pub from: i64, + #[prost(int64, tag = "4")] + pub through: i64, + #[prost(string, tag = "5")] + pub matchers: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LabelNamesForMetricNameRequest { + #[prost(string, tag = "1")] + pub metric_name: ::prost::alloc::string::String, + #[prost(int64, tag = "2")] + pub from: i64, + #[prost(int64, tag = "3")] + pub through: i64, + #[prost(string, tag = "4")] + pub matchers: ::prost::alloc::string::String, +} +/// TODO(owen-d): fix. This will break rollouts as soon as the internal repr is changed. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LineFilter { + #[prost(bytes = "vec", tag = "1")] + pub raw: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetChunkRefRequest { + #[prost(int64, tag = "1")] + pub from: i64, + #[prost(int64, tag = "2")] + pub through: i64, + #[prost(string, tag = "3")] + pub matchers: ::prost::alloc::string::String, + /// TODO(salvacorts): Delete this field once the weekly release is done. + #[prost(message, repeated, tag = "4")] + pub filters: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "5")] + pub plan: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetChunkRefResponse { + #[prost(message, repeated, tag = "1")] + pub refs: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub stats: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetSeriesRequest { + #[prost(int64, tag = "1")] + pub from: i64, + #[prost(int64, tag = "2")] + pub through: i64, + #[prost(string, tag = "3")] + pub matchers: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetSeriesResponse { + #[prost(message, repeated, tag = "1")] + pub series: ::prost::alloc::vec::Vec, +} +/// Series calls to the TSDB Index +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexSeries { + #[prost(message, repeated, tag = "1")] + pub labels: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueryIndexResponse { + #[prost(string, tag = "1")] + pub query_key: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub rows: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Row { + #[prost(bytes = "vec", tag = "1")] + pub range_value: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct QueryIndexRequest { + #[prost(message, repeated, tag = "1")] + pub queries: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexQuery { + #[prost(string, tag = "1")] + pub table_name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub hash_value: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "3")] + pub range_value_prefix: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "4")] + pub range_value_start: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "5")] + pub value_equal: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct IndexStatsRequest { + #[prost(int64, tag = "1")] + pub from: i64, + #[prost(int64, tag = "2")] + pub through: i64, + /// TODO(owen-d): add shards to grpc calls so we don't have + /// to extract via labels + #[prost(string, tag = "3")] + pub matchers: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct IndexStatsResponse { + #[prost(uint64, tag = "1")] + pub streams: u64, + #[prost(uint64, tag = "2")] + pub chunks: u64, + #[prost(uint64, tag = "3")] + pub bytes: u64, + #[prost(uint64, tag = "4")] + pub entries: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VolumeRequest { + #[prost(int64, tag = "1")] + pub from: i64, + #[prost(int64, tag = "2")] + pub through: i64, + #[prost(string, tag = "3")] + pub matchers: ::prost::alloc::string::String, + #[prost(int32, tag = "4")] + pub limit: i32, + #[prost(int64, tag = "5")] + pub step: i64, + #[prost(string, repeated, tag = "6")] + pub target_labels: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, tag = "7")] + pub aggregate_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "8")] + pub caching_options: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VolumeResponse { + #[prost(message, repeated, tag = "1")] + pub volumes: ::prost::alloc::vec::Vec, + #[prost(int32, tag = "2")] + pub limit: i32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Volume { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(uint64, tag = "3")] + pub volume: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DetectedFieldsRequest { + #[prost(message, optional, tag = "1")] + pub start: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "2")] + pub end: ::core::option::Option<::prost_types::Timestamp>, + /// Naming this query instead of match because this should be with queryrangebase.Request interface + #[prost(string, tag = "3")] + pub query: ::prost::alloc::string::String, + #[prost(uint32, tag = "4")] + pub line_limit: u32, + #[prost(uint32, tag = "5")] + pub limit: u32, + #[prost(int64, tag = "6")] + pub step: i64, + /// True to fetch detected field values, false for fetch detected field label names. + #[prost(bool, tag = "7")] + pub values: bool, + /// Name of the detected field to fetch values for. + #[prost(string, tag = "8")] + pub name: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DetectedFieldsResponse { + #[prost(message, repeated, tag = "1")] + pub fields: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "2")] + pub limit: u32, + #[prost(string, repeated, tag = "3")] + pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +/// TODO: make the detected field include the serialized sketch +/// we only want cardinality in the JSON response +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DetectedField { + #[prost(string, tag = "1")] + pub label: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub r#type: ::prost::alloc::string::String, + #[prost(uint64, tag = "3")] + pub cardinality: u64, + #[prost(string, repeated, tag = "4")] + pub parsers: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "5")] + pub sketch: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DetectedLabelsRequest { + #[prost(message, optional, tag = "1")] + pub start: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "2")] + pub end: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, tag = "3")] + pub query: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DetectedLabelsResponse { + #[prost(message, repeated, tag = "1")] + pub detected_labels: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DetectedLabel { + #[prost(string, tag = "1")] + pub label: ::prost::alloc::string::String, + #[prost(uint64, tag = "2")] + pub cardinality: u64, + #[prost(bytes = "vec", tag = "3")] + pub sketch: ::prost::alloc::vec::Vec, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum Direction { Forward = 0, Backward = 1, } +impl Direction { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Forward => "FORWARD", + Self::Backward => "BACKWARD", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "FORWARD" => Some(Self::Forward), + "BACKWARD" => Some(Self::Backward), + _ => None, + } + } +} diff --git a/loki-api/src/resultscache.rs b/loki-api/src/resultscache.rs new file mode 100644 index 0000000..a8ca1fa --- /dev/null +++ b/loki-api/src/resultscache.rs @@ -0,0 +1,26 @@ +// This file is @generated by prost-build. +/// Defined here to prevent circular imports between logproto & queryrangebase +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct CachingOptions { + #[prost(bool, tag = "1")] + pub disabled: bool, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CachedResponse { + #[prost(string, tag = "1")] + pub key: ::prost::alloc::string::String, + /// List of cached responses; non-overlapping and in order. + #[prost(message, repeated, tag = "2")] + pub extents: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Extent { + #[prost(int64, tag = "1")] + pub start: i64, + #[prost(int64, tag = "2")] + pub end: i64, + #[prost(string, tag = "4")] + pub trace_id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "5")] + pub response: ::core::option::Option<::prost_types::Any>, +} diff --git a/loki-api/src/stats.rs b/loki-api/src/stats.rs index 31d6aad..3f304cf 100644 --- a/loki-api/src/stats.rs +++ b/loki-api/src/stats.rs @@ -1,96 +1,194 @@ +// This file is @generated by prost-build. /// Result contains LogQL query statistics. -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Result { - #[prost(message, optional, tag="1")] + #[prost(message, optional, tag = "1")] pub summary: ::core::option::Option, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub querier: ::core::option::Option, - #[prost(message, optional, tag="3")] + #[prost(message, optional, tag = "3")] pub ingester: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub caches: ::core::option::Option, + #[prost(message, optional, tag = "5")] + pub index: ::core::option::Option, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Caches { + #[prost(message, optional, tag = "1")] + pub chunk: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub index: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub result: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub stats_result: ::core::option::Option, + #[prost(message, optional, tag = "5")] + pub volume_result: ::core::option::Option, + #[prost(message, optional, tag = "6")] + pub series_result: ::core::option::Option, + #[prost(message, optional, tag = "7")] + pub label_result: ::core::option::Option, + #[prost(message, optional, tag = "8")] + pub instant_metric_result: ::core::option::Option, } /// Summary is the summary of a query statistics. -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Summary { /// Total bytes processed per second. - #[prost(int64, tag="1")] + #[prost(int64, tag = "1")] pub bytes_processed_per_second: i64, /// Total lines processed per second. - #[prost(int64, tag="2")] + #[prost(int64, tag = "2")] pub lines_processed_per_second: i64, - /// Total bytes processed. - #[prost(int64, tag="3")] + /// Total bytes processed. Includes structured metadata bytes. + #[prost(int64, tag = "3")] pub total_bytes_processed: i64, /// Total lines processed. - #[prost(int64, tag="4")] + #[prost(int64, tag = "4")] pub total_lines_processed: i64, /// Execution time in seconds. /// In addition to internal calculations this is also returned by the HTTP API. /// Grafana expects time values to be returned in seconds as float. - #[prost(double, tag="5")] + #[prost(double, tag = "5")] pub exec_time: f64, /// Queue time in seconds. /// In addition to internal calculations this is also returned by the HTTP API. /// Grafana expects time values to be returned in seconds as float. - #[prost(double, tag="6")] + #[prost(double, tag = "6")] pub queue_time: f64, - /// Total of subqueries created to fulfill this query. - #[prost(int64, tag="7")] + /// Subqueries exists for backwards compatibility reasons and is deprecated. Do not use. + /// Instead use splits and shards + #[prost(int64, tag = "7")] pub subqueries: i64, + /// Total number of result entries returned + #[prost(int64, tag = "8")] + pub total_entries_returned: i64, + /// Total number of splits by time + #[prost(int64, tag = "9")] + pub splits: i64, + /// Total number of shards + #[prost(int64, tag = "10")] + pub shards: i64, + /// Total lines post query filtering + #[prost(int64, tag = "11")] + pub total_post_filter_lines: i64, + /// Total bytes processed of metadata. + #[prost(int64, tag = "12")] + pub total_structured_metadata_bytes_processed: i64, +} +/// Statistics from Index queries +/// TODO(owen-d): include bytes. +/// Needs some index methods added to return _sized_ chunk refs to know +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Index { + /// Total chunks + #[prost(int64, tag = "1")] + pub total_chunks: i64, + /// Post-filtered chunks + #[prost(int64, tag = "2")] + pub post_filter_chunks: i64, + /// Nanosecond duration spent fetching shards + #[prost(int64, tag = "3")] + pub shards_duration: i64, + /// Indicates whether the query used blooms to filter chunks + #[prost(bool, tag = "4")] + pub used_bloom_filters: bool, } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Querier { - #[prost(message, optional, tag="1")] + #[prost(message, optional, tag = "1")] pub store: ::core::option::Option, } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Ingester { /// Total ingester reached for this query. - #[prost(int32, tag="1")] + #[prost(int32, tag = "1")] pub total_reached: i32, /// Total of chunks matched by the query from ingesters - #[prost(int64, tag="2")] + #[prost(int64, tag = "2")] pub total_chunks_matched: i64, /// Total of batches sent from ingesters. - #[prost(int64, tag="3")] + #[prost(int64, tag = "3")] pub total_batches: i64, /// Total lines sent by ingesters. - #[prost(int64, tag="4")] + #[prost(int64, tag = "4")] pub total_lines_sent: i64, - #[prost(message, optional, tag="5")] + #[prost(message, optional, tag = "5")] pub store: ::core::option::Option, } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Store { /// The total of chunk reference fetched from index. - #[prost(int64, tag="1")] + #[prost(int64, tag = "1")] pub total_chunks_ref: i64, /// Total number of chunks fetched. - #[prost(int64, tag="2")] + #[prost(int64, tag = "2")] pub total_chunks_downloaded: i64, /// Time spent fetching chunks in nanoseconds. - #[prost(int64, tag="3")] + #[prost(int64, tag = "3")] pub chunks_download_time: i64, - #[prost(message, optional, tag="4")] + /// Whether the query referenced structured metadata + #[prost(bool, tag = "13")] + pub query_referenced_structured: bool, + #[prost(message, optional, tag = "4")] pub chunk: ::core::option::Option, + /// Time spent fetching chunk refs from index. + #[prost(int64, tag = "5")] + pub chunk_refs_fetch_time: i64, + /// Time spent being blocked on congestion control. + #[prost(int64, tag = "6")] + pub congestion_control_latency: i64, + /// Total number of lines filtered by pipeline wrapper. + #[prost(int64, tag = "7")] + pub pipeline_wrapper_filtered_lines: i64, } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Chunk { - /// Total bytes processed but was already in memory. (found in the headchunk) - #[prost(int64, tag="4")] + /// Total bytes processed but was already in memory (found in the headchunk). Includes structured metadata bytes. + #[prost(int64, tag = "4")] pub head_chunk_bytes: i64, /// Total lines processed but was already in memory. (found in the headchunk) - #[prost(int64, tag="5")] + #[prost(int64, tag = "5")] pub head_chunk_lines: i64, - /// Total bytes decompressed and processed from chunks. - #[prost(int64, tag="6")] + /// Total bytes decompressed and processed from chunks. Includes structured metadata bytes. + #[prost(int64, tag = "6")] pub decompressed_bytes: i64, /// Total lines decompressed and processed from chunks. - #[prost(int64, tag="7")] + #[prost(int64, tag = "7")] pub decompressed_lines: i64, /// Total bytes of compressed chunks (blocks) processed. - #[prost(int64, tag="8")] + #[prost(int64, tag = "8")] pub compressed_bytes: i64, /// Total duplicates found while processing. - #[prost(int64, tag="9")] + #[prost(int64, tag = "9")] pub total_duplicates: i64, + /// Total lines post filtering + #[prost(int64, tag = "10")] + pub post_filter_lines: i64, + /// Total bytes processed for metadata but was already in memory. (found in the headchunk) + #[prost(int64, tag = "11")] + pub head_chunk_structured_metadata_bytes: i64, + /// Total bytes of entries metadata decompressed and processed from chunks. + #[prost(int64, tag = "12")] + pub decompressed_structured_metadata_bytes: i64, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Cache { + #[prost(int32, tag = "1")] + pub entries_found: i32, + #[prost(int32, tag = "2")] + pub entries_requested: i32, + #[prost(int32, tag = "3")] + pub entries_stored: i32, + #[prost(int64, tag = "4")] + pub bytes_received: i64, + #[prost(int64, tag = "5")] + pub bytes_sent: i64, + #[prost(int32, tag = "6")] + pub requests: i32, + #[prost(int64, tag = "7")] + pub download_time: i64, + #[prost(int64, tag = "8")] + pub query_length_served: i64, } diff --git a/src/lib.rs b/src/lib.rs index f0b01a3..328cc7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -403,6 +403,8 @@ impl SendQueue { .map(|e| loki::EntryAdapter { timestamp: Some(e.timestamp.into()), line: e.message.clone(), + structured_metadata: vec![], + parsed: vec![], }) .collect(), // Couldn't find documentation except for the promtail source code: diff --git a/src/log_support.rs b/src/log_support.rs index 2368332..f6e7ac3 100644 --- a/src/log_support.rs +++ b/src/log_support.rs @@ -10,7 +10,7 @@ use tracing_serde::SerdeMapVisitor; pub struct SerializeEventFieldMapStrippingLog<'a>(pub &'a Event<'a>); -impl<'a> Serialize for SerializeEventFieldMapStrippingLog<'a> { +impl Serialize for SerializeEventFieldMapStrippingLog<'_> { fn serialize(&self, serializer: S) -> Result { let len = self.0.fields().count(); let serializer = serializer.serialize_map(Some(len))?;