Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ go 1.23.7
// that will always use its bundled toolchain.
toolchain go1.24.2

replace github.com/grafana/dskit => ../dskit

require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b
Expand Down Expand Up @@ -124,6 +126,7 @@ require (
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang/glog v1.2.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/grafana/otel-profiling-go v0.5.1 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
Expand Down Expand Up @@ -168,6 +171,9 @@ require (
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.60.0 // indirect
go.opentelemetry.io/contrib/propagators/jaeger v1.35.0 // indirect
go.opentelemetry.io/contrib/samplers/jaegerremote v0.29.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/log v0.11.0 // indirect
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
Expand Down
16 changes: 14 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
Expand Down Expand Up @@ -1275,8 +1276,6 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/alerting v0.0.0-20250424095029-28f3388bdf46 h1:pd6zw/GgbFuVqKU4sBNuXY7c8jgsLJQWz3GjuTKlQOo=
github.com/grafana/alerting v0.0.0-20250424095029-28f3388bdf46/go.mod h1:pMfhRxL2LZ3Pm8iy7VcVsb9CLYuBtjFYbf1oxgx7yFA=
github.com/grafana/dskit v0.0.0-20250422145853-90fa6b9a2b76 h1:kdB10/8BOe4TgEeNzNwm9mKEgIA3RoncxwGmguI0Yqs=
github.com/grafana/dskit v0.0.0-20250422145853-90fa6b9a2b76/go.mod h1:BoqjRhTdF0f3ssb1KjVh5B3Jv/HpSd1K27+qs5f8utE=
github.com/grafana/e2e v0.1.2-0.20250424195436-b10e2c0edbd6 h1:EMF+IKn0+HVuyfxGXu6HmcN0hsXuvvllJ/6CWwWZUF8=
github.com/grafana/e2e v0.1.2-0.20250424195436-b10e2c0edbd6/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ=
Expand All @@ -1289,6 +1288,8 @@ github.com/grafana/mimir-prometheus v1.8.2-0.20250423083340-007de9b763aa h1:iuyv
github.com/grafana/mimir-prometheus v1.8.2-0.20250423083340-007de9b763aa/go.mod h1:mSFhqMCiZAtXdeqwMGrOMKvd+2QWIcSUF7hEvYu/8JE=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/otel-profiling-go v0.5.1 h1:stVPKAFZSa7eGiqbYuG25VcqYksR6iWvF3YH66t4qL8=
github.com/grafana/otel-profiling-go v0.5.1/go.mod h1:ftN/t5A/4gQI19/8MoWurBEtC6gFw8Dns1sJZ9W4Tls=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20250424093311-7163931461c6 h1:yvz/Ps9RhzFpPW0vNj7VgxWsznZxzi+94Y/5COUvTes=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20250424093311-7163931461c6/go.mod h1:FGdGvhI40Dq+CTQaSzK9evuve774cgOUdGfVO04OXkw=
github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg=
Expand Down Expand Up @@ -1863,16 +1864,26 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.60.0/go.mod h1:CosX/aS4eHnG9D7nESYpV753l4j9q5j3SL/PUYd2lR8=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ=
go.opentelemetry.io/contrib/propagators/jaeger v1.35.0 h1:UIrZgRBHUrYRlJ4V419lVb4rs2ar0wFzKNAebaP05XU=
go.opentelemetry.io/contrib/propagators/jaeger v1.35.0/go.mod h1:0ciyFyYZxE6JqRAQvIgGRabKWDUmNdW3GAQb6y/RlFU=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.29.0 h1:VpYbyLrB5BS3blBCJMqHRIrbU4RlPnyFovR3La+1j4Q=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.29.0/go.mod h1:XAJmM2MWhiIoTO4LCLBVeE8w009TmsYk6hq1UNdXs5A=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4=
go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI=
go.opentelemetry.io/otel/log v0.11.0 h1:c24Hrlk5WJ8JWcwbQxdBqxZdOK7PcP/LFtOtwpDTe3Y=
go.opentelemetry.io/otel/log v0.11.0/go.mod h1:U/sxQ83FPmT29trrifhQg+Zj2lo1/IPN1PF6RTFqdwc=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
Expand Down Expand Up @@ -2215,6 +2226,7 @@ golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
7 changes: 5 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type Distributor struct {

cfg Config
log log.Logger
ingestersRing ring.ReadRing
ingestersRing ring.ReadRingWithPerTenantConsistency
ingesterPool *ring_client.Pool
limits *validation.Overrides

Expand Down Expand Up @@ -241,6 +241,9 @@ type Config struct {
// IngestStorageConfig is dynamically injected because defined outside of distributor config.
IngestStorageConfig ingest.Config `yaml:"-"`

// IngestReadConsistency is dynamically injected because defined outside of distributor config.
IngestReadConsistency string `yaml:"-"`

// Limits for distributor
DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`
Expand Down Expand Up @@ -372,7 +375,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMgr *costattribution.Manager, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMgr *costattribution.Manager, ingestersRing ring.ReadRingWithPerTenantConsistency, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand Down
19 changes: 17 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,14 @@ func (d *Distributor) getIngesterReplicationSetsForQuery(ctx context.Context) ([

// Lookup ingesters ring because ingest storage is disabled.
shardSize := d.limits.IngestionTenantShardSize(userID)
r := d.ingestersRing
r = r.ShuffleShardWithLookback(userID, shardSize, d.cfg.ShuffleShardingLookbackPeriod, time.Now())
r := ring.ReadRing(d.ingestersRing)

consistencyLevel, err := d.tenantConsistencyLevel(userID)
if err != nil {
return nil, err
}

r = d.ingestersRing.ShuffleShardWithLookbackAndConsistency(userID, shardSize, d.cfg.ShuffleShardingLookbackPeriod, time.Now(), consistencyLevel)

replicationSet, err := r.GetReplicationSetForOperation(readNoExtend)
if err != nil {
Expand All @@ -147,6 +153,15 @@ func (d *Distributor) getIngesterReplicationSetsForQuery(ctx context.Context) ([
return []ring.ReplicationSet{replicationSet}, nil
}

func (d *Distributor) tenantConsistencyLevel(userID string) (ring.ConsistencyLevel, error) {
consistencyLevelStr := d.cfg.IngestReadConsistency
if userConsistencyLevelStr := d.limits.ReadConsistencyLevel(userID); userConsistencyLevelStr != "" {
consistencyLevelStr = userConsistencyLevelStr
}

return ring.ConvertConsistencyLevel(consistencyLevelStr)
}

// mergeExemplarSets merges and dedupes two sets of already sorted exemplar pairs.
// Both a and b should be lists of exemplars from the same series.
// Defined here instead of pkg/util to avoid a import cycle.
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/ingester_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type RingConfig struct {
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
ReadConsistencyLevel string `yaml:"read_consistency_level" category:"advanced"`
ExcludedZones flagext.StringSliceCSV `yaml:"excluded_zones" category:"advanced"`

// Tokens
Expand Down Expand Up @@ -115,6 +116,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.HeartbeatTimeout, prefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled)."+sharedOptionWithRingClient)
f.IntVar(&cfg.ReplicationFactor, prefix+"replication-factor", 3, "Number of ingesters that each time series is replicated to."+sharedOptionWithRingClient)
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones."+sharedOptionWithRingClient)
f.StringVar(&cfg.ReadConsistencyLevel, prefix+"read-consistency-level", "quorum", "Specifies the consistency level used to read from ingester. Supported values are: quorum, any, relaxed_quorum.")
f.Var(&cfg.ExcludedZones, prefix+"excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring."+sharedOptionWithRingClient)

f.StringVar(&cfg.TokensFilePath, prefix+flagTokensFilePath, "", fmt.Sprintf("File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup. Must be empty if -%s is set to %q.", prefix+flagTokenGenerationStrategy, tokenGenerationSpreadMinimizing))
Expand Down
1 change: 1 addition & 0 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.MinimiseIngesterRequestsHedgingDelay = t.Cfg.Querier.MinimiseIngesterRequestsHedgingDelay
t.Cfg.Distributor.PreferAvailabilityZone = t.Cfg.Querier.PreferAvailabilityZone
t.Cfg.Distributor.IngestStorageConfig = t.Cfg.IngestStorage
t.Cfg.Distributor.IngestReadConsistency = t.Cfg.Ingester.IngesterRing.ReadConsistencyLevel

t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides,
t.ActiveGroupsCleanup, t.CostAttributionManager, t.IngesterRing, t.IngesterPartitionInstanceRing,
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type Limits struct {
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs. Labels available during the relabeling phase and cleaned afterwards: __meta_tenant_id" category:"experimental"`
MetricRelabelingEnabled bool `yaml:"metric_relabeling_enabled" json:"metric_relabeling_enabled" category:"experimental"`
ServiceOverloadStatusCodeOnRateLimitEnabled bool `yaml:"service_overload_status_code_on_rate_limit_enabled" json:"service_overload_status_code_on_rate_limit_enabled" category:"experimental"`
ReadConsistencyLevel string `yaml:"read_consistency_level" json:"read_consistency_level" category:"advanced"`

IngestionArtificialDelay model.Duration `yaml:"ingestion_artificial_delay" json:"ingestion_artificial_delay" category:"experimental" doc:"hidden"`
IngestionArtificialDelayConditionForTenantsWithLessThanMaxSeries int `yaml:"ingestion_artificial_delay_condition_for_tenants_with_less_than_max_series" json:"ingestion_artificial_delay_condition_for_tenants_with_less_than_max_series" category:"experimental" doc:"hidden"`
Expand Down Expand Up @@ -283,6 +284,7 @@ type Limits struct {
// RegisterFlags adds the flags required to config this to the given FlagSet
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The tenant's shard size used by shuffle-sharding. This value is the total size of the shard (ie. it is not the number of ingesters in the shard per zone, but the number of ingesters in the shard across all zones, if zone-awareness is enabled). Must be set both on ingesters and distributors. 0 disables shuffle sharding.")
f.StringVar(&l.ReadConsistencyLevel, "distributor.read-consistency-level", "", "The tenant's read consistency level to use for the distributor. Possible values are: quorum, one, all.")
f.Float64Var(&l.RequestRate, RequestRateFlag, 0, "Per-tenant push request rate limit in requests per second. 0 to disable.")
f.IntVar(&l.RequestBurstSize, RequestBurstSizeFlag, 0, "Per-tenant allowed push request burst size. 0 to disable.")
f.Float64Var(&l.IngestionRate, IngestionRateFlag, 10000, "Per-tenant ingestion rate limit in samples per second.")
Expand Down Expand Up @@ -905,6 +907,11 @@ func (o *Overrides) IngestionTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).IngestionTenantShardSize
}

// ReadConsistencyLevel returns the read consistency level for a given user.
func (o *Overrides) ReadConsistencyLevel(userID string) string {
return o.getOverridesForUser(userID).ReadConsistencyLevel
}

// CompactorTenantShardSize returns number of compactors that this user can use. 0 = all compactors.
func (o *Overrides) CompactorTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).CompactorTenantShardSize
Expand Down
17 changes: 12 additions & 5 deletions vendor/github.com/grafana/dskit/grpcclient/instrumentation.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions vendor/github.com/grafana/dskit/httpgrpc/server/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading