diff --git a/model/labels/labels_dedupelabels_struct_size.go b/model/labels/labels_dedupelabels_struct_size.go new file mode 100644 index 000000000..dad9dba37 --- /dev/null +++ b/model/labels/labels_dedupelabels_struct_size.go @@ -0,0 +1,18 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build dedupelabels + +package labels + +const LabelsStructSize uint8 = 24 diff --git a/model/labels/labels_struct_size.go b/model/labels/labels_struct_size.go new file mode 100644 index 000000000..de8ff3ebf --- /dev/null +++ b/model/labels/labels_struct_size.go @@ -0,0 +1,18 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !stringlabels && !dedupelabels + +package labels + +const LabelsStructSize uint8 = 24 diff --git a/model/labels/labels_struct_size_stringlabels.go b/model/labels/labels_struct_size_stringlabels.go new file mode 100644 index 000000000..3efb20a73 --- /dev/null +++ b/model/labels/labels_struct_size_stringlabels.go @@ -0,0 +1,18 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build stringlabels + +package labels + +const LabelsStructSize uint8 = 16 diff --git a/pp/entrypoint/go_constants.h b/pp/entrypoint/go_constants.h index ec74157b1..3bcca3c0b 100644 --- a/pp/entrypoint/go_constants.h +++ b/pp/entrypoint/go_constants.h @@ -3,5 +3,6 @@ #define Sizeof_BareBonesVector 16 #define Sizeof_RoaringBitset 40 #define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset) +#define Sizeof_GoLabels 16 #define Sizeof_SerializedDataIterator 192 diff --git a/pp/entrypoint/series_data/querier.h b/pp/entrypoint/series_data/querier.h index f67ca505e..9ecb4dae2 100644 --- a/pp/entrypoint/series_data/querier.h +++ b/pp/entrypoint/series_data/querier.h @@ -1,6 +1,7 @@ #pragma once #include "bare_bones/bitset.h" +#include "entrypoint/go_constants.h" #include "primitives/go_slice.h" #include "primitives/primitives.h" #include "series_data/querier/instant_querier.h" @@ -47,8 +48,13 @@ class InstantQuerierWithArgumentsWrapper { const Timestamp timestamp_; }; +struct SampleWithGoLabels : public ::series_data::encoder::Sample { + private: + char go_labels_[Sizeof_GoLabels]; +}; + using InstantQuerierWithArgumentsWrapperEntrypoint = InstantQuerierWithArgumentsWrapper, - PromPP::Primitives::Go::SliceView<::series_data::encoder::Sample>>; + std::span>; class RangeQuerierWithArgumentsWrapper { using DataStorage = ::series_data::DataStorage; diff --git a/pp/entrypoint/series_data_data_storage.cpp b/pp/entrypoint/series_data_data_storage.cpp index 0cb19dd77..2cbf07142 100644 --- a/pp/entrypoint/series_data_data_storage.cpp +++ b/pp/entrypoint/series_data_data_storage.cpp @@ -177,13 +177,12 @@ extern "C" void prompp_series_data_data_storage_instant_query(void* args, void* using entrypoint::series_data::InstantQuerierWithArgumentsWrapperEntrypoint; using PromPP::Primitives::Timestamp; using series_data::InstantQuerier; - using series_data::encoder::Sample; struct Arguments { DataStoragePtr data_storage; SliceView label_set_ids; Timestamp timestamp; - SliceView samples; + entrypoint::series_data::SampleWithGoLabels* samples; }; using Result = struct { @@ -193,7 +192,8 @@ extern "C" void prompp_series_data_data_storage_instant_query(void* args, void* const auto in = static_cast(args); - InstantQuerierWithArgumentsWrapperEntrypoint instant_querier(*in->data_storage, in->label_set_ids, in->timestamp, in->samples); + auto samples = std::span(in->samples, in->label_set_ids.size()); + InstantQuerierWithArgumentsWrapperEntrypoint instant_querier(*in->data_storage, in->label_set_ids, in->timestamp, samples); instant_querier.query(); if (instant_querier.need_loading()) { diff --git a/pp/entrypoint/series_data_data_storage.h b/pp/entrypoint/series_data_data_storage.h index fcc4474de..137977c64 100644 --- a/pp/entrypoint/series_data_data_storage.h +++ b/pp/entrypoint/series_data_data_storage.h @@ -126,16 +126,14 @@ void prompp_series_data_data_storage_query(void* args, void* res); void prompp_series_data_data_storage_query_v2(void* args, void* res); /** - * @brief return samples at given timestamp for label sets. + * @brief return instant series at given timestamp for label sets. * * @param args { - * dataStorage uintptr // pointer to constructed data storage - * labelSetIDs []uint32 // series ids - * timestamp int64 // timestamp - * samples []struct { // pre-allocated samples slice - * timestamp int64 - * value float64 - * } + * dataStorage uintptr // pointer to constructed data storage + * labelSetIDs []uint32 // series ids + * timestamp int64 // timestamp + * samples uintptr // pointer to samples data + * } * @param res { * InstantQuerier uintptr // pointer to constructed Querier if data loading is needed * Status uint8 // status of a query (0 - Success, 1 - Data loading is needed) diff --git a/pp/go/cppbridge/data_storage.go b/pp/go/cppbridge/data_storage.go new file mode 100644 index 000000000..192da5550 --- /dev/null +++ b/pp/go/cppbridge/data_storage.go @@ -0,0 +1,121 @@ +package cppbridge + +import ( + "runtime" +) + +// DataStorage is Go wrapper around series_data::Data_storage. +type DataStorage struct { + dataStorage uintptr + gcDestroyDetector *uint64 + timeInterval TimeInterval +} + +// NewDataStorage - constructor. +func NewDataStorage() *DataStorage { + ds := &DataStorage{ + dataStorage: seriesDataDataStorageCtor(), + gcDestroyDetector: &gcDestroyDetector, + timeInterval: NewInvalidTimeInterval(), + } + + runtime.SetFinalizer(ds, func(ds *DataStorage) { + seriesDataDataStorageDtor(ds.dataStorage) + }) + + return ds +} + +// Reset - resets data storage. +func (ds *DataStorage) Reset() { + seriesDataDataStorageReset(ds.dataStorage) + ds.timeInterval = NewInvalidTimeInterval() +} + +func (ds *DataStorage) TimeInterval(invalidateCache bool) TimeInterval { + if invalidateCache || ds.timeInterval.IsInvalid() { + ds.timeInterval = seriesDataDataStorageTimeInterval(ds.dataStorage) + runtime.KeepAlive(ds) + } + + return ds.timeInterval +} + +func (ds *DataStorage) GetQueriedSeriesBitset() []byte { + size := seriesDataDataStorageQueriedSeriesBitsetSize(ds.dataStorage) + bitset := seriesDataDataStorageQueriedSeriesBitset(ds.dataStorage, make([]byte, 0, size)) + runtime.KeepAlive(ds) + return bitset +} + +func (ds *DataStorage) SetQueriedSeriesBitset(bitset []byte) bool { + result := seriesDataDataStorageQueriedSeriesSetBitset(ds.dataStorage, bitset) + runtime.KeepAlive(ds) + return result +} + +func (ds *DataStorage) Pointer() uintptr { + return ds.dataStorage +} + +func (ds *DataStorage) AllocatedMemory() uint64 { + res := seriesDataDataStorageAllocatedMemory(ds.dataStorage) + runtime.KeepAlive(ds) + return res +} + +type UnusedSeriesDataUnloader struct { + unloader uintptr + ds *DataStorage +} + +func (u *UnusedSeriesDataUnloader) CreateSnapshot() []byte { + snapshot := seriesDataUnusedSeriesDataUnloaderCreateSnapshot(u.unloader) + runtime.KeepAlive(u) + return snapshot +} + +func (u *UnusedSeriesDataUnloader) Unload() { + seriesDataUnusedSeriesDataUnloaderUnload(u.unloader) + runtime.KeepAlive(u) +} + +func (ds *DataStorage) CreateUnusedSeriesDataUnloader() *UnusedSeriesDataUnloader { + unloader := &UnusedSeriesDataUnloader{ + unloader: seriesDataUnusedSeriesDataUnloaderCtor(ds.dataStorage), + ds: ds, + } + + runtime.SetFinalizer(unloader, func(u *UnusedSeriesDataUnloader) { + seriesDataUnusedSeriesDataUnloaderDtor(u.unloader) + }) + + return unloader +} + +type DataStorageQuery struct { + StartTimestampMs int64 + EndTimestampMs int64 + LabelSetIDs []uint32 +} + +func (ds *DataStorage) Query(query DataStorageQuery) DataStorageQueryResult { + sd := NewDataStorageSerializedData() + querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd) + return DataStorageQueryResult{ + Querier: querier, + Status: status, + SerializedData: sd, + } +} + +// InstantQuery . +// Deprecated: InstantQuery . +func (ds *DataStorage) InstantQuery(targetTimestamp int64, labelSetIDs []uint32, samples uintptr) DataStorageQueryResult { + return seriesDataDataStorageInstantQuery(ds.dataStorage, labelSetIDs, targetTimestamp, samples) +} + +func (ds *DataStorage) QueryFinal(queriers []uintptr) { + seriesDataDataStorageQueryFinal(queriers) + runtime.KeepAlive(queriers) +} diff --git a/pp/go/cppbridge/entrypoint.go b/pp/go/cppbridge/entrypoint.go index 7846ee9a9..a9427d25e 100644 --- a/pp/go/cppbridge/entrypoint.go +++ b/pp/go/cppbridge/entrypoint.go @@ -34,6 +34,10 @@ type ( CppSerializedDataIterator = [C.Sizeof_SerializedDataIterator]byte ) +const ( + GoLabelsSize = C.Sizeof_GoLabels +) + var ( // per_goroutine_relabeler input_relabeling @@ -1856,10 +1860,10 @@ type DataStorageQueryResult struct { SerializedData *DataStorageSerializedData } -func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuery, serializedData *DataStorageSerializedData) (querier uintptr, status uint8) { +func seriesDataDataStorageQueryV2(dataStorage uintptr, query DataStorageQuery, serializedData *DataStorageSerializedData) (querier uintptr, status uint8) { args := struct { dataStorage uintptr - query HeadDataStorageQuery + query DataStorageQuery }{dataStorage, query} var res = struct { @@ -1883,14 +1887,13 @@ func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuer return res.Querier, res.Status } -func seriesDataDataStorageInstantQuery(dataStorage uintptr, labelSetIDs []uint32, timestamp int64, samples []Sample) DataStorageQueryResult { +func seriesDataDataStorageInstantQuery(dataStorage uintptr, labelSetIDs []uint32, timestamp int64, samples uintptr) DataStorageQueryResult { args := struct { dataStorage uintptr labelSetIDs []uint32 timestamp int64 - samples []Sample + samples uintptr }{dataStorage, labelSetIDs, timestamp, samples} - var res DataStorageQueryResult testGC() diff --git a/pp/go/cppbridge/entrypoint.h b/pp/go/cppbridge/entrypoint.h index 2d1b7061a..8524de0b8 100755 --- a/pp/go/cppbridge/entrypoint.h +++ b/pp/go/cppbridge/entrypoint.h @@ -38,6 +38,7 @@ void prompp_dump_memory_profile(void* args, void* res); #define Sizeof_BareBonesVector 16 #define Sizeof_RoaringBitset 40 #define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset) +#define Sizeof_GoLabels 16 #define Sizeof_SerializedDataIterator 192 #ifdef __cplusplus @@ -1361,16 +1362,14 @@ void prompp_series_data_data_storage_query(void* args, void* res); void prompp_series_data_data_storage_query_v2(void* args, void* res); /** - * @brief return samples at given timestamp for label sets. + * @brief return instant series at given timestamp for label sets. * * @param args { - * dataStorage uintptr // pointer to constructed data storage - * labelSetIDs []uint32 // series ids - * timestamp int64 // timestamp - * samples []struct { // pre-allocated samples slice - * timestamp int64 - * value float64 - * } + * dataStorage uintptr // pointer to constructed data storage + * labelSetIDs []uint32 // series ids + * timestamp int64 // timestamp + * samples uintptr // pointer to samples data + * } * @param res { * InstantQuerier uintptr // pointer to constructed Querier if data loading is needed * Status uint8 // status of a query (0 - Success, 1 - Data loading is needed) diff --git a/pp/go/cppbridge/head.go b/pp/go/cppbridge/head.go index ae8d67c2e..69ac322a1 100644 --- a/pp/go/cppbridge/head.go +++ b/pp/go/cppbridge/head.go @@ -46,103 +46,14 @@ type Sample struct { Value float64 } -// HeadDataStorage is Go wrapper around series_data::Data_storage. -type HeadDataStorage struct { - dataStorage uintptr - gcDestroyDetector *uint64 - timeInterval TimeInterval -} - -// NewHeadDataStorage - constructor. -func NewHeadDataStorage() *HeadDataStorage { - ds := &HeadDataStorage{ - dataStorage: seriesDataDataStorageCtor(), - gcDestroyDetector: &gcDestroyDetector, - timeInterval: NewInvalidTimeInterval(), - } - - runtime.SetFinalizer(ds, func(ds *HeadDataStorage) { - seriesDataDataStorageDtor(ds.dataStorage) - }) - - return ds -} - -// Reset - resets data storage. -func (ds *HeadDataStorage) Reset() { - seriesDataDataStorageReset(ds.dataStorage) - ds.timeInterval = NewInvalidTimeInterval() -} - -func (ds *HeadDataStorage) TimeInterval(invalidateCache bool) TimeInterval { - if invalidateCache || ds.timeInterval.IsInvalid() { - ds.timeInterval = seriesDataDataStorageTimeInterval(ds.dataStorage) - runtime.KeepAlive(ds) - } - - return ds.timeInterval -} - -func (ds *HeadDataStorage) GetQueriedSeriesBitset() []byte { - size := seriesDataDataStorageQueriedSeriesBitsetSize(ds.dataStorage) - bitset := seriesDataDataStorageQueriedSeriesBitset(ds.dataStorage, make([]byte, 0, size)) - runtime.KeepAlive(ds) - return bitset -} - -func (ds *HeadDataStorage) SetQueriedSeriesBitset(bitset []byte) bool { - result := seriesDataDataStorageQueriedSeriesSetBitset(ds.dataStorage, bitset) - runtime.KeepAlive(ds) - return result -} - -func (ds *HeadDataStorage) Pointer() uintptr { - return ds.dataStorage -} - -func (ds *HeadDataStorage) AllocatedMemory() uint64 { - res := seriesDataDataStorageAllocatedMemory(ds.dataStorage) - runtime.KeepAlive(ds) - return res -} - -type UnusedSeriesDataUnloader struct { - unloader uintptr - ds *HeadDataStorage -} - -func (u *UnusedSeriesDataUnloader) CreateSnapshot() []byte { - snapshot := seriesDataUnusedSeriesDataUnloaderCreateSnapshot(u.unloader) - runtime.KeepAlive(u) - return snapshot -} - -func (u *UnusedSeriesDataUnloader) Unload() { - seriesDataUnusedSeriesDataUnloaderUnload(u.unloader) - runtime.KeepAlive(u) -} - -func (ds *HeadDataStorage) CreateUnusedSeriesDataUnloader() *UnusedSeriesDataUnloader { - unloader := &UnusedSeriesDataUnloader{ - unloader: seriesDataUnusedSeriesDataUnloaderCtor(ds.dataStorage), - ds: ds, - } - - runtime.SetFinalizer(unloader, func(u *UnusedSeriesDataUnloader) { - seriesDataUnusedSeriesDataUnloaderDtor(u.unloader) - }) - - return unloader -} - // HeadEncoder is Go wrapper around series_data::Encoder. type HeadEncoder struct { encoder uintptr - dataStorage *HeadDataStorage + dataStorage *DataStorage } // NewHeadEncoderWithDataStorage - constructor. -func NewHeadEncoderWithDataStorage(dataStorage *HeadDataStorage) *HeadEncoder { +func NewHeadEncoderWithDataStorage(dataStorage *DataStorage) *HeadEncoder { encoder := &HeadEncoder{ encoder: seriesDataEncoderCtor(dataStorage.dataStorage), dataStorage: dataStorage, @@ -157,7 +68,7 @@ func NewHeadEncoderWithDataStorage(dataStorage *HeadDataStorage) *HeadEncoder { // NewHeadEncoder - constructor. func NewHeadEncoder() *HeadEncoder { - return NewHeadEncoderWithDataStorage(NewHeadDataStorage()) + return NewHeadEncoderWithDataStorage(NewDataStorage()) } // Encode - encodes single triplet. @@ -195,11 +106,11 @@ type ChunkRecoder struct { recodedChunk RecodedChunk lss *LabelSetStorage - dataStorage *HeadDataStorage + dataStorage *DataStorage serializedData *DataStorageSerializedData } -func NewChunkRecoder(lss *LabelSetStorage, lsIdBatchSize uint32, dataStorage *HeadDataStorage, timeInterval TimeInterval) *ChunkRecoder { +func NewChunkRecoder(lss *LabelSetStorage, lsIdBatchSize uint32, dataStorage *DataStorage, timeInterval TimeInterval) *ChunkRecoder { return initializeChunkRecoder(lss, dataStorage, nil, seriesDataChunkRecoderCtor(lss.Pointer(), lsIdBatchSize, dataStorage.dataStorage, timeInterval)) } @@ -209,7 +120,7 @@ func NewSerializedChunkRecoder(serializedData *DataStorageSerializedData, timeIn func initializeChunkRecoder( lss *LabelSetStorage, - dataStorage *HeadDataStorage, + dataStorage *DataStorage, serializedData *DataStorageSerializedData, recoder uintptr, ) *ChunkRecoder { @@ -238,12 +149,6 @@ func (recoder *ChunkRecoder) NextBatch() bool { return result } -type HeadDataStorageQuery struct { - StartTimestampMs int64 - EndTimestampMs int64 - LabelSetIDs []uint32 -} - func getSeriesIDFromBytes(data []byte) uint32 { return *(*uint32)(unsafe.Pointer(&data[0])) // #nosec G103 // it's meant to be that way } @@ -314,31 +219,6 @@ func (i HeadDataStorageSerializedChunkIndex) Chunks(r *HeadDataStorageSerialized return res } -func (ds *HeadDataStorage) Query(query HeadDataStorageQuery) DataStorageQueryResult { - sd := NewDataStorageSerializedData() - querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd) - return DataStorageQueryResult{ - Querier: querier, - Status: status, - SerializedData: sd, - } -} - -func (ds *HeadDataStorage) InstantQuery(targetTimestamp, defaultTimestamp int64, labelSetIDs []uint32) ([]Sample, DataStorageQueryResult) { - samples := make([]Sample, len(labelSetIDs)) - if defaultTimestamp != 0 { - for index := range samples { - samples[index].Timestamp = defaultTimestamp - } - } - return samples, seriesDataDataStorageInstantQuery(ds.dataStorage, labelSetIDs, targetTimestamp, samples) -} - -func (ds *HeadDataStorage) QueryFinal(queriers []uintptr) { - seriesDataDataStorageQueryFinal(queriers) - runtime.KeepAlive(queriers) -} - type DataStorageSerializedData struct { serializedData uintptr } @@ -392,7 +272,7 @@ func (it *DataStorageSerializedDataIterator) HasData() bool { // UnloadedDataLoader is Go wrapper around series_data::Loader. type UnloadedDataLoader struct { loader uintptr - ds *HeadDataStorage + ds *DataStorage } func (loader *UnloadedDataLoader) Load(snapshot []byte, isLast bool) { @@ -400,7 +280,7 @@ func (loader *UnloadedDataLoader) Load(snapshot []byte, isLast bool) { runtime.KeepAlive(loader) } -func (ds *HeadDataStorage) CreateLoader(queriers []uintptr) *UnloadedDataLoader { +func (ds *DataStorage) CreateLoader(queriers []uintptr) *UnloadedDataLoader { result := &UnloadedDataLoader{ loader: seriesDataUnloadedDataLoaderCtor(ds.dataStorage, queriers), ds: ds, @@ -426,7 +306,7 @@ func (loader *UnloadedDataRevertableLoader) NextBatch() bool { return result } -func (ds *HeadDataStorage) CreateRevertableLoader(lss *LabelSetStorage, lsIdBatchSize uint32) *UnloadedDataRevertableLoader { +func (ds *DataStorage) CreateRevertableLoader(lss *LabelSetStorage, lsIdBatchSize uint32) *UnloadedDataRevertableLoader { result := &UnloadedDataRevertableLoader{ UnloadedDataLoader: UnloadedDataLoader{ loader: seriesDataUnloadedDataRevertableLoaderCtor(lss.pointer, lsIdBatchSize, ds.dataStorage), diff --git a/pp/go/cppbridge/head_status.go b/pp/go/cppbridge/head_status.go index 9029863ba..402102923 100644 --- a/pp/go/cppbridge/head_status.go +++ b/pp/go/cppbridge/head_status.go @@ -48,6 +48,6 @@ func (s *HeadStatus) FromLSS(lss *LabelSetStorage, limit int) { } // FromDataStorage get head status from data storage. -func (s *HeadStatus) FromDataStorage(dataStorage *HeadDataStorage) { +func (s *HeadStatus) FromDataStorage(dataStorage *DataStorage) { getHeadStatusDataStorage(dataStorage.dataStorage, s) } diff --git a/pp/go/cppbridge/head_status_test.go b/pp/go/cppbridge/head_status_test.go index 8ccfd83d2..b0ea93f68 100644 --- a/pp/go/cppbridge/head_status_test.go +++ b/pp/go/cppbridge/head_status_test.go @@ -10,7 +10,7 @@ import ( type HeadStatusSuite struct { suite.Suite - dataStorage *cppbridge.HeadDataStorage + dataStorage *cppbridge.DataStorage encoder *cppbridge.HeadEncoder lssStorage *cppbridge.LabelSetStorage limit int @@ -21,7 +21,7 @@ func TestHeadStatusSuite(t *testing.T) { } func (s *HeadStatusSuite) SetupTest() { - s.dataStorage = cppbridge.NewHeadDataStorage() + s.dataStorage = cppbridge.NewDataStorage() s.encoder = cppbridge.NewHeadEncoderWithDataStorage(s.dataStorage) s.lssStorage = cppbridge.NewQueryableLssStorage() s.limit = 10 diff --git a/pp/go/cppbridge/head_test.go b/pp/go/cppbridge/head_test.go index a9cd2942c..f0665f106 100644 --- a/pp/go/cppbridge/head_test.go +++ b/pp/go/cppbridge/head_test.go @@ -1,7 +1,9 @@ package cppbridge_test import ( + "github.com/prometheus/prometheus/pp/go/storage/querier" "testing" + "unsafe" "github.com/stretchr/testify/require" @@ -13,7 +15,7 @@ import ( type HeadSuite struct { suite.Suite lss *cppbridge.LabelSetStorage - dataStorage *cppbridge.HeadDataStorage + dataStorage *cppbridge.DataStorage encoder *cppbridge.HeadEncoder } @@ -23,7 +25,7 @@ func TestHeadSuite(t *testing.T) { func (s *HeadSuite) SetupTest() { s.lss = cppbridge.NewQueryableLssStorage() - s.dataStorage = cppbridge.NewHeadDataStorage() + s.dataStorage = cppbridge.NewDataStorage() s.encoder = cppbridge.NewHeadEncoderWithDataStorage(s.dataStorage) } @@ -118,7 +120,7 @@ func (s *HeadSuite) TestSerializedChunkRecoder() { s.encoder.Encode(1, 4, 2.0) timeInterval := cppbridge.TimeInterval{MinT: 0, MaxT: 4} - result := s.dataStorage.Query(cppbridge.HeadDataStorageQuery{ + result := s.dataStorage.Query(cppbridge.DataStorageQuery{ StartTimestampMs: timeInterval.MinT, EndTimestampMs: timeInterval.MaxT, LabelSetIDs: []uint32{0, 1}}, @@ -156,7 +158,7 @@ func (s *HeadSuite) TestSerializedChunkRecoder() { func (s *HeadSuite) TestTimeInterval() { // Arrange - dataStorage := cppbridge.NewHeadDataStorage() + dataStorage := cppbridge.NewDataStorage() encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage) encoder.Encode(0, 1, 1.0) encoder.Encode(0, 2, 1.0) @@ -177,7 +179,7 @@ func (s *HeadSuite) TestTimeInterval() { func (s *HeadSuite) TestInstantQuery() { // Arrange - dataStorage := cppbridge.NewHeadDataStorage() + dataStorage := cppbridge.NewDataStorage() encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage) var series = []struct { SeriesID uint32 @@ -200,16 +202,19 @@ func (s *HeadSuite) TestInstantQuery() { seriesIDs := []uint32{0, 1, 2, 3} targetTimestamp := int64(5) defaultTimestamp := int64(-1) + instantSeries := make([]querier.InstantSeries, len(seriesIDs)) + for i := range instantSeries { + instantSeries[i].Timestamp = defaultTimestamp + } // Act - samples, result := dataStorage.InstantQuery(targetTimestamp, defaultTimestamp, seriesIDs) + result := dataStorage.InstantQuery(targetTimestamp, seriesIDs, uintptr(unsafe.Pointer(unsafe.SliceData(instantSeries)))) // Assert require.Equal(s.T(), cppbridge.DataStorageQueryStatusSuccess, result.Status) - require.Len(s.T(), samples, 4) - s.Equal(defaultTimestamp, samples[0].Timestamp) - s.Equal(series[2].Sample, samples[1]) - s.Equal(series[5].Sample, samples[2]) - s.Equal(series[6].Sample, samples[3]) + s.Equal(defaultTimestamp, instantSeries[0].Timestamp) + s.Equal(series[2].Sample, cppbridge.Sample{Timestamp: instantSeries[1].Timestamp, Value: instantSeries[1].Value}) + s.Equal(series[5].Sample, cppbridge.Sample{Timestamp: instantSeries[2].Timestamp, Value: instantSeries[2].Value}) + s.Equal(series[6].Sample, cppbridge.Sample{Timestamp: instantSeries[3].Timestamp, Value: instantSeries[3].Value}) } diff --git a/pp/go/cppbridge/head_wal_test.go b/pp/go/cppbridge/head_wal_test.go index d81d6d80c..f9962d1d6 100644 --- a/pp/go/cppbridge/head_wal_test.go +++ b/pp/go/cppbridge/head_wal_test.go @@ -78,7 +78,7 @@ func TestHeadWalDecoder_DecodeToDataStorage(t *testing.T) { // Arrange const kTestBufferVersion = 3 - dataStorage := cppbridge.NewHeadDataStorage() + dataStorage := cppbridge.NewDataStorage() encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage) decoder := cppbridge.NewHeadWalDecoder(cppbridge.NewQueryableLssStorage(), kTestBufferVersion) segment, _ := hex.DecodeString(hexSegment) diff --git a/pp/go/storage/appender/appender_test.go b/pp/go/storage/appender/appender_test.go index 1a9b0607a..fe1ace831 100644 --- a/pp/go/storage/appender/appender_test.go +++ b/pp/go/storage/appender/appender_test.go @@ -119,7 +119,7 @@ func (s *AppenderSuite) getHeadData(labelSetIDs []uint32) headStorageData { lssResult := sh.LSS().Target().GetLabelSets(labelSetIDs) data.lssResult = append(data.lssResult, lssResult) - dsResult := sh.DataStorage().Query(cppbridge.HeadDataStorageQuery{ + dsResult := sh.DataStorage().Query(cppbridge.DataStorageQuery{ StartTimestampMs: 0, EndTimestampMs: math.MaxInt64, LabelSetIDs: labelSetIDs, diff --git a/pp/go/storage/block/block.go b/pp/go/storage/block/block.go index 3b1bb834c..af353e7ef 100644 --- a/pp/go/storage/block/block.go +++ b/pp/go/storage/block/block.go @@ -55,7 +55,7 @@ type ChunkIterator struct { func NewChunkIterator( lss *cppbridge.LabelSetStorage, lsIdBatchSize uint32, - ds *cppbridge.HeadDataStorage, + ds *cppbridge.DataStorage, minT, maxT int64, ) ChunkIterator { return ChunkIterator{ diff --git a/pp/go/storage/block/writer.go b/pp/go/storage/block/writer.go index 271d78314..aa0e60939 100644 --- a/pp/go/storage/block/writer.go +++ b/pp/go/storage/block/writer.go @@ -93,7 +93,7 @@ func (w *Writer[TShard]) createWriters(sd TShard) (blockWriters, error) { } var chunkIterator ChunkIterator - _ = sd.DataStorage().WithRLock(func(*cppbridge.HeadDataStorage) error { + _ = sd.DataStorage().WithRLock(func(*cppbridge.DataStorage) error { chunkIterator = NewChunkIterator(sd.LSS().Target(), LsIdBatchSize, sd.DataStorage().Raw(), minT, maxT) return nil }) @@ -118,7 +118,7 @@ func (w *Writer[TShard]) createWriters(sd TShard) (blockWriters, error) { // recodeAndWriteChunks recodes and writes chunks for the shard. func (*Writer[TShard]) recodeAndWriteChunks(sd TShard, writers blockWriters) error { var loader *cppbridge.UnloadedDataRevertableLoader - _ = sd.DataStorage().WithRLock(func(*cppbridge.HeadDataStorage) error { + _ = sd.DataStorage().WithRLock(func(*cppbridge.DataStorage) error { loader = sd.DataStorage().CreateRevertableLoader(sd.LSS().Target(), LsIdBatchSize) return nil }) @@ -142,7 +142,7 @@ func (*Writer[TShard]) recodeAndWriteChunks(sd TShard, writers blockWriters) err for { var hasMoreData bool var err error - _ = sd.DataStorage().WithLock(func(*cppbridge.HeadDataStorage) error { + _ = sd.DataStorage().WithLock(func(*cppbridge.DataStorage) error { hasMoreData, err = loadData() return nil }) @@ -155,7 +155,7 @@ func (*Writer[TShard]) recodeAndWriteChunks(sd TShard, writers blockWriters) err return err } - if err = sd.DataStorage().WithRLock(func(*cppbridge.HeadDataStorage) error { + if err = sd.DataStorage().WithRLock(func(*cppbridge.DataStorage) error { return writers.recodeAndWriteChunksBatch() }); err != nil { return err diff --git a/pp/go/storage/head/shard/data_storage.go b/pp/go/storage/head/shard/data_storage.go index 0cb18e2ca..8bdcfab83 100644 --- a/pp/go/storage/head/shard/data_storage.go +++ b/pp/go/storage/head/shard/data_storage.go @@ -8,14 +8,14 @@ import ( // DataStorage samles storage with labels IDs. type DataStorage struct { - dataStorage *cppbridge.HeadDataStorage + dataStorage *cppbridge.DataStorage encoder *cppbridge.HeadEncoder locker sync.RWMutex } // NewDataStorage int new [DataStorage]. func NewDataStorage() *DataStorage { - dataStorage := cppbridge.NewHeadDataStorage() + dataStorage := cppbridge.NewDataStorage() return &DataStorage{ dataStorage: dataStorage, encoder: cppbridge.NewHeadEncoderWithDataStorage(dataStorage), @@ -48,16 +48,16 @@ func (ds *DataStorage) DecodeSegment(decoder *cppbridge.HeadWalDecoder, data []b return decoder.DecodeToDataStorage(data, ds.encoder) } -// InstantQuery make instant query to data storage and returns samples. +// InstantQuery make instant query to data storage and fills samples in instant series. func (ds *DataStorage) InstantQuery( - targetTimestamp, notFoundValueTimestampValue int64, - seriesIDs []uint32, -) ([]cppbridge.Sample, cppbridge.DataStorageQueryResult) { + targetTimestamp int64, + ids []uint32, + samples uintptr, +) cppbridge.DataStorageQueryResult { ds.locker.RLock() - samples, res := ds.dataStorage.InstantQuery(targetTimestamp, notFoundValueTimestampValue, seriesIDs) + res := ds.dataStorage.InstantQuery(targetTimestamp, ids, samples) ds.locker.RUnlock() - - return samples, res + return res } func (ds *DataStorage) Encode(seriesID uint32, timestamp int64, value float64) { @@ -73,7 +73,7 @@ func (ds *DataStorage) MergeOutOfOrderChunks() { ds.locker.Unlock() } -func (ds *DataStorage) Query(query cppbridge.HeadDataStorageQuery) cppbridge.DataStorageQueryResult { +func (ds *DataStorage) Query(query cppbridge.DataStorageQuery) cppbridge.DataStorageQueryResult { ds.locker.RLock() result := ds.dataStorage.Query(query) ds.locker.RUnlock() @@ -94,13 +94,13 @@ func (ds *DataStorage) QueryStatus(status *cppbridge.HeadStatus) { ds.locker.RUnlock() } -// Raw returns raw [cppbridge.HeadDataStorage]. -func (ds *DataStorage) Raw() *cppbridge.HeadDataStorage { +// Raw returns raw [cppbridge.DataStorage]. +func (ds *DataStorage) Raw() *cppbridge.DataStorage { return ds.dataStorage } -// WithLock calls fn on raw [cppbridge.HeadDataStorage] with write lock. -func (ds *DataStorage) WithLock(fn func(ds *cppbridge.HeadDataStorage) error) error { +// WithLock calls fn on raw [cppbridge.DataStorage] with write lock. +func (ds *DataStorage) WithLock(fn func(ds *cppbridge.DataStorage) error) error { ds.locker.Lock() err := fn(ds.dataStorage) ds.locker.Unlock() @@ -108,8 +108,8 @@ func (ds *DataStorage) WithLock(fn func(ds *cppbridge.HeadDataStorage) error) er return err } -// WithRLock calls fn on raw [cppbridge.HeadDataStorage] with read lock. -func (ds *DataStorage) WithRLock(fn func(ds *cppbridge.HeadDataStorage) error) error { +// WithRLock calls fn on raw [cppbridge.DataStorage] with read lock. +func (ds *DataStorage) WithRLock(fn func(ds *cppbridge.DataStorage) error) error { ds.locker.RLock() err := fn(ds.dataStorage) ds.locker.RUnlock() diff --git a/pp/go/storage/head/shard/shard.go b/pp/go/storage/head/shard/shard.go index b0a7ee61d..2e683aaee 100644 --- a/pp/go/storage/head/shard/shard.go +++ b/pp/go/storage/head/shard/shard.go @@ -187,7 +187,7 @@ func (s *Shard) UnloadUnusedSeriesData() error { unloader := s.DataStorage().CreateUnusedSeriesDataUnloader() var snapshot, queriedSeries []byte - _ = s.DataStorage().WithRLock(func(*cppbridge.HeadDataStorage) error { + _ = s.DataStorage().WithRLock(func(*cppbridge.DataStorage) error { snapshot = unloader.CreateSnapshot() queriedSeries = s.DataStorage().GetQueriedSeriesBitset() return nil @@ -198,7 +198,7 @@ func (s *Shard) UnloadUnusedSeriesData() error { return fmt.Errorf("unable to write unloaded series data snapshot: %v", err) } - _ = s.DataStorage().WithLock(func(*cppbridge.HeadDataStorage) error { + _ = s.DataStorage().WithLock(func(*cppbridge.DataStorage) error { s.UnloadedDataStorage().WriteIndex(header) unloader.Unload() return nil @@ -216,7 +216,7 @@ func (s *Shard) LoadAndQuerySeriesData() (err error) { var queriers []uintptr s.loadAndQueryTask.Release(func(q []uintptr) { queriers = q - err = s.DataStorage().WithLock(func(*cppbridge.HeadDataStorage) error { + err = s.DataStorage().WithLock(func(*cppbridge.DataStorage) error { loader := s.DataStorage().CreateLoader(queriers) return s.UnloadedDataStorage().ForEachSnapshot(loader.Load) }) diff --git a/pp/go/storage/loader_test.go b/pp/go/storage/loader_test.go index b8819400d..bb255d576 100644 --- a/pp/go/storage/loader_test.go +++ b/pp/go/storage/loader_test.go @@ -207,7 +207,7 @@ func (s *HeadLoadSuite) TestLoadWithDisabledDataUnloading() { // Act loadedHead := s.mustLoadHead(0) - queryResult := s.shards(loadedHead)[0].DataStorage().Query(cppbridge.HeadDataStorageQuery{ + queryResult := s.shards(loadedHead)[0].DataStorage().Query(cppbridge.DataStorageQuery{ StartTimestampMs: 0, EndTimestampMs: 2, LabelSetIDs: []uint32{0}, @@ -259,7 +259,7 @@ func (s *HeadLoadSuite) TestAppendAfterLoad() { }, }) - queryResult := s.shards(loadedHead)[0].DataStorage().Query(cppbridge.HeadDataStorageQuery{ + queryResult := s.shards(loadedHead)[0].DataStorage().Query(cppbridge.DataStorageQuery{ StartTimestampMs: 0, EndTimestampMs: 4, LabelSetIDs: []uint32{0}, diff --git a/pp/go/storage/querier/instant_series.go b/pp/go/storage/querier/instant_series.go new file mode 100644 index 000000000..9ec0ce2af --- /dev/null +++ b/pp/go/storage/querier/instant_series.go @@ -0,0 +1,199 @@ +package querier + +import ( + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/pp/go/cppbridge" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" +) + +// +// InstantSeriesSet +// + +// NewInstantSeriesSlice creates InstantSeries slice and sets default timestamp to each series. +func NewInstantSeriesSlice(size int, defaultTimestamp int64) []InstantSeries { + seriesSlice := make([]InstantSeries, size) + for i := range seriesSlice { + seriesSlice[i].Timestamp = defaultTimestamp + } + return seriesSlice +} + +// InstantSeriesSet contains a instant set of series, allows to iterate over sorted, populated series. +type InstantSeriesSet struct { + lssQueryResult *cppbridge.LSSQueryResult + labelSetSnapshot *cppbridge.LabelSetSnapshot + valueNotFoundTimestampValue int64 + nextSeriesIndex int + instantSeries []InstantSeries +} + +// NewInstantSeriesSet init new [InstantSeriesSet]. +func NewInstantSeriesSet( + lssQueryResult *cppbridge.LSSQueryResult, + labelSetSnapshot *cppbridge.LabelSetSnapshot, + valueNotFoundTimestampValue int64, + instantSeries []InstantSeries, +) *InstantSeriesSet { + return &InstantSeriesSet{ + lssQueryResult: lssQueryResult, + labelSetSnapshot: labelSetSnapshot, + valueNotFoundTimestampValue: valueNotFoundTimestampValue, + instantSeries: instantSeries, + } +} + +// At returns full series. Returned series should be iterable even after Next is called. +func (ss *InstantSeriesSet) At() storage.Series { + return &ss.instantSeries[ss.nextSeriesIndex-1] +} + +// Err the error that iteration as failed with. +func (*InstantSeriesSet) Err() error { + return nil +} + +// Next return true if exist there is a next series and false otherwise. +func (ss *InstantSeriesSet) Next() bool { + for { + if ss.nextSeriesIndex >= len(ss.instantSeries) { + return false + } + + if ss.instantSeries[ss.nextSeriesIndex].Timestamp != ss.valueNotFoundTimestampValue { + break + } + + ss.nextSeriesIndex++ + } + + lsID, lsLength := ss.lssQueryResult.GetByIndex(ss.nextSeriesIndex) + ss.instantSeries[ss.nextSeriesIndex].LabelSet = labels.NewLabelsWithLSS( + ss.labelSetSnapshot, + lsID, + lsLength, + ) + + ss.nextSeriesIndex++ + return true +} + +// Warnings a collection of warnings for the whole set. +func (*InstantSeriesSet) Warnings() annotations.Annotations { + return nil +} + +// +// InstantSeries +// + +// InstantSeries is a instant stream of data points belonging to a metric. +type InstantSeries struct { + Timestamp int64 + Value float64 + LabelSet labels.Labels +} + +// Iterator is storage.Series interface implementation. +func (s *InstantSeries) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { + if i, ok := iterator.(*InstantSeriesChunkIterator); ok { + i.ResetTo(s.Timestamp, s.Value) + return i + } + return NewInstantSeriesChunkIterator(s.Timestamp, s.Value) +} + +// Labels is storage.Series interface implementation. +func (s *InstantSeries) Labels() labels.Labels { + return s.LabelSet +} + +// +// InstantSeriesChunkIterator +// + +// InstantSeriesChunkIterator iterates over the samples of a instant time series, that can only get the next value. +type InstantSeriesChunkIterator struct { + i int + t int64 + v float64 +} + +// NewInstantSeriesChunkIterator init new [InstantSeriesChunkIterator]. +func NewInstantSeriesChunkIterator(t int64, v float64) *InstantSeriesChunkIterator { + return &InstantSeriesChunkIterator{ + i: -1, + t: t, + v: v, + } +} + +// At returns the current timestamp/value pair if the value is a float. +// +//nolint:gocritic // unnamedResult not need +func (i *InstantSeriesChunkIterator) At() (int64, float64) { + return i.t, i.v +} + +// AtFloatHistogram returns the current timestamp/value pair if the value is a histogram with floating-point counts. +func (*InstantSeriesChunkIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return 0, nil +} + +// AtHistogram returns the current timestamp/value pair if the value is a histogram with integer counts. +func (*InstantSeriesChunkIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { + return 0, nil +} + +// AtT returns the current timestamp. +func (i *InstantSeriesChunkIterator) AtT() int64 { + return i.t +} + +// Err returns the current error. +func (*InstantSeriesChunkIterator) Err() error { + return nil +} + +// Next advances the iterator by one and returns the type of the value. +func (i *InstantSeriesChunkIterator) Next() chunkenc.ValueType { + if i.i < 1 { + i.i++ + } + return i.valueType() +} + +// ResetTo reset state to timestamp and value. +func (i *InstantSeriesChunkIterator) ResetTo(t int64, v float64) { + i.i = -1 + i.t = t + i.v = v +} + +// Seek advances the iterator forward to the first sample with a timestamp equal or greater than t. +func (i *InstantSeriesChunkIterator) Seek(t int64) chunkenc.ValueType { + if i.valueType() == chunkenc.ValFloat && i.t >= t { + return chunkenc.ValFloat + } + + for { + if i.Next() == chunkenc.ValNone { + return chunkenc.ValNone + } + + if i.t >= t { + return chunkenc.ValFloat + } + } +} + +func (i *InstantSeriesChunkIterator) valueType() chunkenc.ValueType { + if i.i == 0 { + return chunkenc.ValFloat + } + + return chunkenc.ValNone +} diff --git a/pp/go/storage/querier/instant_series_bench_test.go b/pp/go/storage/querier/instant_series_bench_test.go new file mode 100644 index 000000000..5739b89b7 --- /dev/null +++ b/pp/go/storage/querier/instant_series_bench_test.go @@ -0,0 +1,62 @@ +package querier_test + +import ( + "fmt" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/pp/go/cppbridge" + "github.com/prometheus/prometheus/pp/go/model" + "github.com/prometheus/prometheus/pp/go/storage/head/shard" + "github.com/prometheus/prometheus/pp/go/storage/querier" + "github.com/prometheus/prometheus/pp/go/storage/storagetest" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/require" + "testing" +) + +func prepareInstantData(lss *shard.LSS, ds *shard.DataStorage, timeStamps []int64, size int) { + timeSeries := make([]storagetest.TimeSeries, 0, size) + for _, ts := range timeStamps { + for i := 0; i < size; i++ { + label := fmt.Sprintf("index_%d", i) + timeSeries = append(timeSeries, storagetest.TimeSeries{ + Labels: labels.FromStrings("__name__", "metric", "job", label, "container", "", "id", "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod37ce076d8d523c8b0c8c0b6191d927f6.slice/cri-containerd-bdd69edcd2fb187baa3381810051e8cc7b8a0d0368e168040f93adb3260582b2.scope", "image", "registry.k8s.io/pause:3.8", "name", "bdd69edcd2fb187baa3381810051e8cc7b8a0d0368e168040f93adb3260582b2", "namespace", "kube-system", "pod", "kube-scheduler-m1.k8s.lan"), + Samples: []cppbridge.Sample{ + {Timestamp: ts, Value: float64(i)}, + }, + }) + } + + } + storagetest.MustAppendTimeSeriesToLSSAndDataStorage(lss, ds, timeSeries...) +} + +func BenchmarkInstantSeriesSet(b *testing.B) { + size := 500000 + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + lss := shard.NewLSS() + ds := shard.NewDataStorage() + timestamps := []int64{0, 1, 2} + valueNotFoundTimestampValue := timestamps[0] - 1 + prepareInstantData(lss, ds, timestamps, size) + + b.StopTimer() + seriesSets := make([]*querier.InstantSeriesSet, 0, b.N) + for i := 0; i < b.N; i++ { + seriesSet, err := storagetest.InstantQuery(lss, ds, timestamps[1], valueNotFoundTimestampValue, matcher) + require.NoError(b, err) + seriesSets = append(seriesSets, seriesSet) + } + + b.ReportAllocs() + b.ResetTimer() + b.StartTimer() + var iterator chunkenc.Iterator + for i := 0; i < b.N; i++ { + iterator = storagetest.IterateSeriesSet(seriesSets[i], iterator) + } +} diff --git a/pp/go/storage/querier/instant_series_test.go b/pp/go/storage/querier/instant_series_test.go new file mode 100644 index 000000000..764e5186b --- /dev/null +++ b/pp/go/storage/querier/instant_series_test.go @@ -0,0 +1,127 @@ +package querier_test + +import ( + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/pp/go/cppbridge" + "github.com/prometheus/prometheus/pp/go/model" + "github.com/prometheus/prometheus/pp/go/storage/head/shard" + "github.com/prometheus/prometheus/pp/go/storage/storagetest" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "testing" +) + +type InstantSeriesTestSuite struct { + suite.Suite + + lss *shard.LSS + ds *shard.DataStorage + data []storagetest.TimeSeries + valueNotFoundTimestampValue int64 +} + +func TestInstantSeriesTestSuite(t *testing.T) { + suite.Run(t, new(InstantSeriesTestSuite)) +} + +func (s *InstantSeriesTestSuite) SetupTest() { + s.lss = shard.NewLSS() + s.ds = shard.NewDataStorage() + s.data = []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__name__", "metric", "job", "test"), + Samples: []cppbridge.Sample{ + {Timestamp: 10, Value: 0}, + }, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test"), + Samples: []cppbridge.Sample{ + {Timestamp: 11, Value: 1}, + }, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test"), + Samples: []cppbridge.Sample{ + {Timestamp: 12, Value: 2}, + }, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test_2"), + Samples: []cppbridge.Sample{ + {Timestamp: 10, Value: 0}, + }, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test_2"), + Samples: []cppbridge.Sample{ + {Timestamp: 11, Value: 1}, + }, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test_2"), + Samples: []cppbridge.Sample{ + {Timestamp: 12, Value: 2}, + }, + }, + } + + storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.data...) +} + +func (s *InstantSeriesTestSuite) TestSuccess() { + // Arrange + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + targetTimestamp := int64(11) + expected := []storagetest.TimeSeries{s.data[1], s.data[4]} + + // Act + seriesSet, err := storagetest.InstantQuery(s.lss, s.ds, targetTimestamp, s.valueNotFoundTimestampValue, matcher) + + // Assert + require.NoError(s.T(), err) + require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) +} + +func (s *InstantSeriesTestSuite) TestEmptyResult() { + // Arrange + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + targetTimestamp := int64(4) + expected := []storagetest.TimeSeries{} + + // Act + seriesSet, err := storagetest.InstantQuery(s.lss, s.ds, targetTimestamp, s.valueNotFoundTimestampValue, matcher) + + // Assert + require.NoError(s.T(), err) + require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) +} + +func (s *InstantSeriesTestSuite) TestBigTargetTimestamp() { + // Arrange + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + targetTimestamp := int64(1000000) + expected := []storagetest.TimeSeries{s.data[2], s.data[5]} + + // Act + seriesSet, err := storagetest.InstantQuery(s.lss, s.ds, targetTimestamp, s.valueNotFoundTimestampValue, matcher) + + // Assert + require.NoError(s.T(), err) + require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) +} diff --git a/pp/go/storage/querier/interface.go b/pp/go/storage/querier/interface.go index d51fe2ed1..ea89228a3 100644 --- a/pp/go/storage/querier/interface.go +++ b/pp/go/storage/querier/interface.go @@ -40,19 +40,20 @@ type Task interface { // DataStorage the minimum required [DataStorage] implementation. type DataStorage interface { - // InstantQuery returns samples for instant query from data storage. + // InstantQuery fills samples for instant query from data storage. InstantQuery( - maxt, valueNotFoundTimestampValue int64, + maxt int64, ids []uint32, - ) ([]cppbridge.Sample, cppbridge.DataStorageQueryResult) + samples uintptr, + ) cppbridge.DataStorageQueryResult // Query returns serialized chunks from data storage. Query( - query cppbridge.HeadDataStorageQuery, + query cppbridge.DataStorageQuery, ) cppbridge.DataStorageQueryResult - // WithRLock calls fn on raw [cppbridge.HeadDataStorage] with read lock. - WithRLock(fn func(ds *cppbridge.HeadDataStorage) error) error + // WithRLock calls fn on raw [cppbridge.DataStorage] with read lock. + WithRLock(fn func(ds *cppbridge.DataStorage) error) error } // diff --git a/pp/go/storage/querier/querier.go b/pp/go/storage/querier/querier.go index 89fefb7e7..b273089b8 100644 --- a/pp/go/storage/querier/querier.go +++ b/pp/go/storage/querier/querier.go @@ -6,16 +6,16 @@ import ( "fmt" "sort" "time" + "unsafe" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/util/annotations" - "github.com/prometheus/prometheus/pp/go/cppbridge" "github.com/prometheus/prometheus/pp/go/logger" "github.com/prometheus/prometheus/pp/go/model" "github.com/prometheus/prometheus/pp/go/util/locker" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" ) const ( @@ -199,11 +199,13 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectInstant( shardID := s.ShardID() lssQueryResult := lssQueryResults[shardID] if lssQueryResult == nil { - seriesSets[shardID] = &SeriesSet{} + seriesSets[shardID] = &InstantSeriesSet{} return nil } - samples, result := s.DataStorage().InstantQuery(q.maxt, valueNotFoundTimestampValue, lssQueryResult.IDs()) + instantSeries := NewInstantSeriesSlice(lssQueryResult.Len(), valueNotFoundTimestampValue) + + result := s.DataStorage().InstantQuery(q.maxt, lssQueryResult.IDs(), uintptr(unsafe.Pointer(unsafe.SliceData(instantSeries)))) if result.Status == cppbridge.DataStorageQueryStatusNeedDataLoad { loadAndQueryWaiter.Add(s, result.Querier) } @@ -212,7 +214,7 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectInstant( lssQueryResult, snapshots[shardID], valueNotFoundTimestampValue, - samples, + instantSeries, ) return nil @@ -221,7 +223,7 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectInstant( q.head.Enqueue(tDataStorageQuery) _ = tDataStorageQuery.Wait() - if err := loadAndQueryWaiter.Wait(); err != nil { + if err = loadAndQueryWaiter.Wait(); err != nil { SendUnrecoverableError(err) return storage.ErrSeriesSet(err) } @@ -315,7 +317,7 @@ func queryDataStorage[ } var result cppbridge.DataStorageQueryResult - result = s.DataStorage().Query(cppbridge.HeadDataStorageQuery{ + result = s.DataStorage().Query(cppbridge.DataStorageQuery{ StartTimestampMs: mint, EndTimestampMs: maxt, LabelSetIDs: lssQueryResult.IDs(), diff --git a/pp/go/storage/querier/series.go b/pp/go/storage/querier/series.go index 3ec070619..81916aad1 100644 --- a/pp/go/storage/querier/series.go +++ b/pp/go/storage/querier/series.go @@ -233,186 +233,3 @@ func (s *SeriesSet) Err() error { func (s *SeriesSet) Warnings() annotations.Annotations { return nil } - -// -// InstantSeriesSet -// - -// InstantSeriesSet contains a instatnt set of series, allows to iterate over sorted, populated series. -type InstantSeriesSet struct { - lssQueryResult *cppbridge.LSSQueryResult - labelSetSnapshot *cppbridge.LabelSetSnapshot - valueNotFoundTimestampValue int64 - samples []cppbridge.Sample - nextSampleIndex int - series []InstantSeries -} - -// NewInstantSeriesSet init new [InstantSeriesSet]. -func NewInstantSeriesSet( - lssQueryResult *cppbridge.LSSQueryResult, - labelSetSnapshot *cppbridge.LabelSetSnapshot, - valueNotFoundTimestampValue int64, - samples []cppbridge.Sample, -) *InstantSeriesSet { - return &InstantSeriesSet{ - lssQueryResult: lssQueryResult, - labelSetSnapshot: labelSetSnapshot, - valueNotFoundTimestampValue: valueNotFoundTimestampValue, - samples: samples, - series: make([]InstantSeries, 0, len(samples)), - } -} - -// At returns full series. Returned series should be iterable even after Next is called. -func (ss *InstantSeriesSet) At() storage.Series { - return &ss.series[len(ss.series)-1] -} - -// Err the error that iteration as failed with. -func (*InstantSeriesSet) Err() error { - return nil -} - -// Next return true if exist there is a next series and false otherwise. -func (ss *InstantSeriesSet) Next() bool { - for { - if ss.nextSampleIndex >= ss.lssQueryResult.Len() { - return false - } - - if ss.samples[ss.nextSampleIndex].Timestamp != ss.valueNotFoundTimestampValue { - break - } - ss.nextSampleIndex++ - } - - lsID, lsLength := ss.lssQueryResult.GetByIndex(ss.nextSampleIndex) - ss.series = append(ss.series, InstantSeries{ - labelSet: labels.NewLabelsWithLSS( - ss.labelSetSnapshot, - lsID, - lsLength, - ), - sample: &ss.samples[ss.nextSampleIndex], - }) - - ss.nextSampleIndex++ - return true -} - -// Warnings a collection of warnings for the whole set. -func (*InstantSeriesSet) Warnings() annotations.Annotations { - return nil -} - -// -// InstantSeries -// - -// InstantSeries is a instant stream of data points belonging to a metric. -type InstantSeries struct { - labelSet labels.Labels - sample *cppbridge.Sample -} - -// Iterator is storage.Series interface implementation. -func (s *InstantSeries) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { - if i, ok := iterator.(*InstantSeriesChunkIterator); ok { - i.ResetTo(s.sample.Timestamp, s.sample.Value) - return i - } - return NewInstantSeriesChunkIterator(s.sample.Timestamp, s.sample.Value) -} - -// Labels is storage.Series interface implementation. -func (s *InstantSeries) Labels() labels.Labels { - return s.labelSet -} - -// -// InstantSeriesChunkIterator -// - -// InstantSeriesChunkIterator iterates over the samples of a instant time series, that can only get the next value. -type InstantSeriesChunkIterator struct { - i int - t int64 - v float64 -} - -// NewInstantSeriesChunkIterator init new [InstantSeriesChunkIterator]. -func NewInstantSeriesChunkIterator(t int64, v float64) *InstantSeriesChunkIterator { - return &InstantSeriesChunkIterator{ - i: -1, - t: t, - v: v, - } -} - -// At returns the current timestamp/value pair if the value is a float. -// -//nolint:gocritic // unnamedResult not need -func (i *InstantSeriesChunkIterator) At() (int64, float64) { - return i.t, i.v -} - -// AtFloatHistogram returns the current timestamp/value pair if the value is a histogram with floating-point counts. -func (*InstantSeriesChunkIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - return 0, nil -} - -// AtHistogram returns the current timestamp/value pair if the value is a histogram with integer counts. -func (*InstantSeriesChunkIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { - return 0, nil -} - -// AtT returns the current timestamp. -func (i *InstantSeriesChunkIterator) AtT() int64 { - return i.t -} - -// Err returns the current error. -func (*InstantSeriesChunkIterator) Err() error { - return nil -} - -// Next advances the iterator by one and returns the type of the value. -func (i *InstantSeriesChunkIterator) Next() chunkenc.ValueType { - if i.i < 1 { - i.i++ - } - return i.valueType() -} - -// ResetTo reset state to timestamp and value. -func (i *InstantSeriesChunkIterator) ResetTo(t int64, v float64) { - i.i = -1 - i.t = t - i.v = v -} - -// Seek advances the iterator forward to the first sample with a timestamp equal or greater than t. -func (i *InstantSeriesChunkIterator) Seek(t int64) chunkenc.ValueType { - if i.valueType() == chunkenc.ValFloat && i.t >= t { - return chunkenc.ValFloat - } - - for { - if i.Next() == chunkenc.ValNone { - return chunkenc.ValNone - } - - if i.t >= t { - return chunkenc.ValFloat - } - } -} - -func (i *InstantSeriesChunkIterator) valueType() chunkenc.ValueType { - if i.i == 0 { - return chunkenc.ValFloat - } - - return chunkenc.ValNone -} diff --git a/pp/go/storage/querier/series_bench_test.go b/pp/go/storage/querier/series_bench_test.go index 7b48b70c0..26b9e783d 100644 --- a/pp/go/storage/querier/series_bench_test.go +++ b/pp/go/storage/querier/series_bench_test.go @@ -40,7 +40,7 @@ func queryOpt(t testing.TB, lss *shard.LSS, ds *shard.DataStorage, start, end in return &querier.SeriesSet{} } - dsQueryResult := ds.Query(cppbridge.HeadDataStorageQuery{ + dsQueryResult := ds.Query(cppbridge.DataStorageQuery{ StartTimestampMs: start, EndTimestampMs: end, LabelSetIDs: lssQueryResult.IDs(), @@ -50,23 +50,6 @@ func queryOpt(t testing.TB, lss *shard.LSS, ds *shard.DataStorage, start, end in return querier.NewSeriesSet(start, end, lssQueryResult, snapshot, dsQueryResult.SerializedData) } -func instantQuery(t testing.TB, lss *shard.LSS, ds *shard.DataStorage, targetTimestamp, valueNotFoundTimestampValue int64, matchers ...model.LabelMatcher) *querier.InstantSeriesSet { - selector, snapshot, err := lss.QuerySelector(0, matchers) - require.NoError(t, err) - if selector == 0 || snapshot == nil { - return &querier.InstantSeriesSet{} - } - - lssQueryResult := snapshot.Query(selector) - if lssQueryResult.Status() == cppbridge.LSSQueryStatusNoMatch { - return &querier.InstantSeriesSet{} - } - - samples, dsQueryResult := ds.InstantQuery(targetTimestamp, valueNotFoundTimestampValue, lssQueryResult.IDs()) - require.Equal(t, cppbridge.DataStorageQueryStatusSuccess, dsQueryResult.Status) - return querier.NewInstantSeriesSet(lssQueryResult, snapshot, valueNotFoundTimestampValue, samples) -} - func BenchmarkSeriesSetOpt(b *testing.B) { size := 500000 matcher := model.LabelMatcher{ @@ -81,6 +64,7 @@ func BenchmarkSeriesSetOpt(b *testing.B) { ds := shard.NewDataStorage() prepareData(lss, ds, size) + b.StopTimer() seriesSets := make([]*querier.SeriesSet, 0, b.N) for i := 0; i < b.N; i++ { seriesSets = append(seriesSets, queryOpt(b, lss, ds, start, end, matcher)) @@ -106,46 +90,3 @@ func prepareData(lss *shard.LSS, ds *shard.DataStorage, size int) { } storagetest.MustAppendTimeSeriesToLSSAndDataStorage(lss, ds, timeSeries...) } - -func prepareInstantData(lss *shard.LSS, ds *shard.DataStorage, timeStamps []int64, size int) { - timeSeries := make([]storagetest.TimeSeries, 0, size) - for _, ts := range timeStamps { - for i := 0; i < size; i++ { - label := fmt.Sprintf("index_%d", i) - timeSeries = append(timeSeries, storagetest.TimeSeries{ - Labels: labels.FromStrings("__name__", "metric", "job", label, "container", "", "id", "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod37ce076d8d523c8b0c8c0b6191d927f6.slice/cri-containerd-bdd69edcd2fb187baa3381810051e8cc7b8a0d0368e168040f93adb3260582b2.scope", "image", "registry.k8s.io/pause:3.8", "name", "bdd69edcd2fb187baa3381810051e8cc7b8a0d0368e168040f93adb3260582b2", "namespace", "kube-system", "pod", "kube-scheduler-m1.k8s.lan"), - Samples: []cppbridge.Sample{ - {Timestamp: ts, Value: float64(i)}, - }, - }) - } - - } - storagetest.MustAppendTimeSeriesToLSSAndDataStorage(lss, ds, timeSeries...) -} - -func BenchmarkInstantSeriesSet(b *testing.B) { - size := 500000 - matcher := model.LabelMatcher{ - Name: "__name__", - Value: "metric", - MatcherType: model.MatcherTypeExactMatch, - } - - lss := shard.NewLSS() - ds := shard.NewDataStorage() - timestamps := []int64{0, 1, 2} - valueNotFoundTimestampValue := timestamps[0] - 1 - prepareInstantData(lss, ds, timestamps, size) - - seriesSets := make([]*querier.InstantSeriesSet, 0, b.N) - for i := 0; i < b.N; i++ { - seriesSets = append(seriesSets, instantQuery(b, lss, ds, timestamps[1], valueNotFoundTimestampValue, matcher)) - } - - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - iterateSeriesSet(seriesSets[i]) - } -} diff --git a/pp/go/storage/querier/series_test.go b/pp/go/storage/querier/series_test.go index 8b018d972..b04851802 100644 --- a/pp/go/storage/querier/series_test.go +++ b/pp/go/storage/querier/series_test.go @@ -83,7 +83,7 @@ func (s *SeriesSetTestSuite) query(lss *shard.LSS, ds *shard.DataStorage, start, return &querier.SeriesSet{} } - dsQueryResult := ds.Query(cppbridge.HeadDataStorageQuery{ + dsQueryResult := ds.Query(cppbridge.DataStorageQuery{ StartTimestampMs: start, EndTimestampMs: end, LabelSetIDs: lssQueryResult.IDs(), diff --git a/pp/go/storage/querier/status_querier.go b/pp/go/storage/querier/status_querier.go index 893ddd0e7..09ece53ca 100644 --- a/pp/go/storage/querier/status_querier.go +++ b/pp/go/storage/querier/status_querier.go @@ -65,7 +65,7 @@ func QueryHeadStatus[ tDataStorageHeadStatus := head.CreateTask( dsHeadStatus, func(shard TShard) error { - return shard.DataStorage().WithRLock(func(ds *cppbridge.HeadDataStorage) error { + return shard.DataStorage().WithRLock(func(ds *cppbridge.DataStorage) error { shardStatuses[shard.ShardID()].FromDataStorage(ds) return nil diff --git a/pp/go/storage/storagetest/fixtures.go b/pp/go/storage/storagetest/fixtures.go index c241dd66a..45565573c 100644 --- a/pp/go/storage/storagetest/fixtures.go +++ b/pp/go/storage/storagetest/fixtures.go @@ -2,10 +2,12 @@ package storagetest import ( "context" + "fmt" "math" "os" "path/filepath" "time" + "unsafe" "github.com/jonboulle/clockwork" "github.com/prometheus/prometheus/model/labels" @@ -16,6 +18,7 @@ import ( "github.com/prometheus/prometheus/pp/go/storage/catalog" "github.com/prometheus/prometheus/pp/go/storage/head/services" "github.com/prometheus/prometheus/pp/go/storage/head/shard" + "github.com/prometheus/prometheus/pp/go/storage/querier" promstorage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/suite" @@ -160,6 +163,43 @@ func TimeSeriesFromSeries(series promstorage.Series, chunkIterator chunkenc.Iter return timeSeries } +func IterateSeriesSet(seriesSet promstorage.SeriesSet, iterator chunkenc.Iterator) chunkenc.Iterator { + var series promstorage.Series + for seriesSet.Next() { + series = seriesSet.At() + iterator = series.Iterator(iterator) + for iterator.Next() != chunkenc.ValNone { + ts, v := iterator.At() + _, _ = ts, v + } + } + return iterator +} + +func InstantQuery(lss *shard.LSS, ds *shard.DataStorage, targetTimestamp, valueNotFoundTimestampValue int64, matchers ...model.LabelMatcher) (*querier.InstantSeriesSet, error) { + selector, snapshot, err := lss.QuerySelector(0, matchers) + if err != nil { + return nil, err + } + if selector == 0 || snapshot == nil { + return &querier.InstantSeriesSet{}, nil + } + + lssQueryResult := snapshot.Query(selector) + if lssQueryResult.Status() == cppbridge.LSSQueryStatusNoMatch { + return &querier.InstantSeriesSet{}, nil + } + + instantSeries := querier.NewInstantSeriesSlice(lssQueryResult.Len(), valueNotFoundTimestampValue) + + dsQueryResult := ds.InstantQuery(targetTimestamp, lssQueryResult.IDs(), uintptr(unsafe.Pointer(unsafe.SliceData(instantSeries)))) + if dsQueryResult.Status != cppbridge.DataStorageQueryStatusSuccess { + return nil, fmt.Errorf("invalid data storage query result status") + } + + return querier.NewInstantSeriesSet(lssQueryResult, snapshot, valueNotFoundTimestampValue, instantSeries), nil +} + const ( NumberOfShards uint16 = 2 MaxSegmentSize uint32 = 1024