Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
* [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978
* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003
* [ENHANCEMENT] Modernizes the entire codebase by using go modernize tool. #7005
* [ENHANCEMENT] Overrides Exporter: Expose all fields that can be converted to float64. Also, the label value `max_local_series_per_metric` got renamed to `max_series_per_metric`, and `max_local_series_per_user` got renamed to `max_series_per_user`. #6979
* [ENHANCEMENT] Ingester: Add `cortex_ingester_tsdb_wal_replay_unknown_refs_total` and `cortex_ingester_tsdb_wbl_replay_unknown_refs_total` metrics to track unknown series references during wal/wbl replaying. #6945
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4252,6 +4252,11 @@ query_rejection:
# CLI flag: -parquet-converter.tenant-shard-size
[parquet_converter_tenant_shard_size: <float> | default = 0]

# Additional label names for specific tenants to sort by after metric name, in
# order of precedence. These are applied during Parquet file generation.
# CLI flag: -parquet-converter.sort-columns
[parquet_converter_sort_columns: <list of string> | default = []]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
5 changes: 5 additions & 0 deletions docs/guides/parquet-mode.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ limits:

# Shard size for shuffle sharding (0 = disabled)
parquet_converter_tenant_shard_size: 0.8

# Defines sort columns applied during Parquet file generation for specific tenants
parquet_converter_sort_columns: ["label1", "label2"]
```

You can also configure per-tenant settings using runtime configuration:
Expand All @@ -115,6 +118,7 @@ overrides:
tenant-1:
parquet_converter_enabled: true
parquet_converter_tenant_shard_size: 2
parquet_converter_sort_columns: ["cluster", "namespace"]
tenant-2:
parquet_converter_enabled: false
```
Expand Down Expand Up @@ -280,6 +284,7 @@ cortex_parquet_queryable_cache_misses_total
1. **Row Group Size**: Adjust `max_rows_per_row_group` based on your query patterns
2. **Cache Size**: Tune `parquet_queryable_shard_cache_size` based on available memory
3. **Concurrency**: Adjust `meta_sync_concurrency` based on object storage performance
4. **Sort Columns**: Configure `parquet_converter_sort_columns` based on your most common query filters to improve query performance

### Fallback Configuration

Expand Down
6 changes: 5 additions & 1 deletion pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
metrics: newMetrics(registerer),
bkt: bkt,
baseConverterOptions: []convert.ConvertOption{
convert.WithSortBy(labels.MetricName),
convert.WithColDuration(time.Hour * 8),
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
},
Expand Down Expand Up @@ -430,6 +429,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin

converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String()))

sortColumns := []string{labels.MetricName}
userConfiguredSortColumns := c.limits.ParquetConverterSortColumns(userID)
sortColumns = append(sortColumns, userConfiguredSortColumns...)
converterOpts = append(converterOpts, convert.WithSortBy(sortColumns...))

if c.cfg.FileBufferEnabled {
converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
}
Expand Down
34 changes: 31 additions & 3 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,19 @@ func TestConverter(t *testing.T) {
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)
userSpecificSortColumns := []string{"cluster", "namespace"}

// Create a mock tenant limits implementation
tenantLimits := &mockTenantLimits{
limits: map[string]*validation.Limits{
user: {
ParquetConverterSortColumns: userSpecificSortColumns,
ParquetConverterEnabled: true,
},
},
}

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, tenantLimits)

ctx := context.Background()

Expand Down Expand Up @@ -157,7 +169,7 @@ func prepareConfig() Config {
return cfg
}

func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Converter, log.Logger, prometheus.Gatherer) {
func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
flagext.DefaultValues(&storageCfg)
Expand All @@ -176,7 +188,7 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket,
flagext.DefaultValues(limits)
}

overrides := validation.NewOverrides(*limits, nil)
overrides := validation.NewOverrides(*limits, tenantLimits)

scanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{
Strategy: cortex_tsdb.UserScanStrategyList,
Expand Down Expand Up @@ -384,3 +396,19 @@ func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDe
},
}, nil
}

// mockTenantLimits implements the validation.TenantLimits interface for testing
type mockTenantLimits struct {
limits map[string]*validation.Limits
}

func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits {
if limits, ok := m.limits[userID]; ok {
return limits
}
return nil
}

func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits {
return m.limits
}
12 changes: 9 additions & 3 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ type Limits struct {
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`

// Parquet converter
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"`
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"`

ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"`
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"`
ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"`
// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."`
Expand Down Expand Up @@ -325,6 +325,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.")
f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.")
f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.")

// Parquet Queryable enforced limits.
f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.")
Expand Down Expand Up @@ -903,6 +904,11 @@ func (o *Overrides) ParquetConverterEnabled(userID string) bool {
return o.GetOverridesForUser(userID).ParquetConverterEnabled
}

// ParquetConverterSortColumns returns the additional sort columns for parquet files.
func (o *Overrides) ParquetConverterSortColumns(userID string) []string {
return o.GetOverridesForUser(userID).ParquetConverterSortColumns
}

// ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage.
func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int {
return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount
Expand Down
Loading