Skip to content

Commit 88e1346

Browse files
committed
allow querier to execute logical plan fragments following child-root-execution model
Signed-off-by: rubywtl <[email protected]>
1 parent eab2099 commit 88e1346

16 files changed

+1840
-37
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
//go:build integration_query_fuzz
2+
// +build integration_query_fuzz
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"math/rand"
9+
"path"
10+
"strconv"
11+
"strings"
12+
"testing"
13+
"time"
14+
15+
"github.com/cortexproject/promqlsmith"
16+
"github.com/prometheus/prometheus/model/labels"
17+
"github.com/prometheus/prometheus/prompb"
18+
"github.com/stretchr/testify/require"
19+
20+
"github.com/cortexproject/cortex/integration/e2e"
21+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
22+
"github.com/cortexproject/cortex/integration/e2ecortex"
23+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
24+
)
25+
26+
func TestDistributedExecutionFuzz(t *testing.T) {
27+
s, err := e2e.NewScenario(networkName)
28+
require.NoError(t, err)
29+
defer s.Close()
30+
31+
// start dependencies.
32+
consul1 := e2edb.NewConsulWithName("consul1")
33+
consul2 := e2edb.NewConsulWithName("consul2")
34+
require.NoError(t, s.StartAndWaitReady(consul1, consul2))
35+
36+
flags := mergeFlags(
37+
AlertmanagerLocalFlags(),
38+
map[string]string{
39+
"-store.engine": blocksStorageEngine,
40+
"-blocks-storage.backend": "filesystem",
41+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
42+
"-blocks-storage.tsdb.block-ranges-period": "2h",
43+
"-blocks-storage.tsdb.ship-interval": "1h",
44+
"-blocks-storage.bucket-store.sync-interval": "15m",
45+
"-blocks-storage.tsdb.retention-period": "2h",
46+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
47+
"-querier.query-store-for-labels-enabled": "true",
48+
// Ingester.
49+
"-ring.store": "consul",
50+
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
51+
// Distributor.
52+
"-distributor.replication-factor": "1",
53+
// Store-gateway.
54+
"-store-gateway.sharding-enabled": "false",
55+
// alert manager
56+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
57+
},
58+
)
59+
// make alert manager config dir
60+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
61+
62+
path1 := path.Join(s.SharedDir(), "cortex-1")
63+
path2 := path.Join(s.SharedDir(), "cortex-2")
64+
65+
flags1 := mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path1})
66+
67+
// Start first Cortex replicas
68+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), flags1, "")
69+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), flags1, "")
70+
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags1, "")
71+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), flags1, "")
72+
require.NoError(t, s.StartAndWaitReady(queryScheduler, distributor, ingester, storeGateway))
73+
flags1 = mergeFlags(flags1, map[string]string{
74+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
75+
})
76+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags1, map[string]string{
77+
"-frontend.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
78+
}), "")
79+
require.NoError(t, s.Start(queryFrontend))
80+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul1.NetworkHTTPEndpoint(), mergeFlags(flags1, map[string]string{
81+
"-querier.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
82+
}), "")
83+
require.NoError(t, s.StartAndWaitReady(querier))
84+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
85+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
86+
c1, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
87+
require.NoError(t, err)
88+
89+
// Enable distributed execution for the second Cortex instance.
90+
flags2 := mergeFlags(flags, map[string]string{
91+
"-frontend.query-vertical-shard-size": "2",
92+
"-blocks-storage.filesystem.dir": path2,
93+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
94+
"-querier.thanos-engine": "true",
95+
"-querier.distributed-exec-enabled": "true",
96+
"-api.querier-default-codec": "protobuf",
97+
})
98+
99+
distributor2 := e2ecortex.NewDistributor("distributor2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), flags2, "")
100+
ingester2 := e2ecortex.NewIngester("ingester2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), flags2, "")
101+
queryScheduler2 := e2ecortex.NewQueryScheduler("query-scheduler2", flags2, "")
102+
storeGateway2 := e2ecortex.NewStoreGateway("store-gateway2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), flags2, "")
103+
require.NoError(t, s.StartAndWaitReady(queryScheduler2, distributor2, ingester2, storeGateway2))
104+
flags2 = mergeFlags(flags1, map[string]string{
105+
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway2.NetworkGRPCEndpoint()}, ","),
106+
})
107+
queryFrontend2 := e2ecortex.NewQueryFrontend("query-frontend2", mergeFlags(flags2, map[string]string{
108+
"-frontend.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
109+
}), "")
110+
require.NoError(t, s.Start(queryFrontend2))
111+
querier2 := e2ecortex.NewQuerier("querier2", e2ecortex.RingStoreConsul, consul2.NetworkHTTPEndpoint(), mergeFlags(flags2, map[string]string{
112+
"-querier.scheduler-address": queryScheduler2.NetworkGRPCEndpoint(),
113+
}), "")
114+
require.NoError(t, s.StartAndWaitReady(querier2))
115+
require.NoError(t, distributor2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
116+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
117+
c2, err := e2ecortex.NewClient(distributor2.HTTPEndpoint(), queryFrontend2.HTTPEndpoint(), "", "", "user-1")
118+
require.NoError(t, err)
119+
120+
now := time.Now()
121+
// Push some series to Cortex.
122+
start := now.Add(-time.Minute * 10)
123+
end := now.Add(-time.Minute * 1)
124+
numSeries := 3
125+
numSamples := 20
126+
lbls := make([]labels.Labels, numSeries*2)
127+
serieses := make([]prompb.TimeSeries, numSeries*2)
128+
scrapeInterval := 30 * time.Second
129+
for i := 0; i < numSeries; i++ {
130+
series := e2e.GenerateSeriesWithSamples("test_series_a", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
131+
serieses[i] = series
132+
builder := labels.NewBuilder(labels.EmptyLabels())
133+
for _, lbl := range series.Labels {
134+
builder.Set(lbl.Name, lbl.Value)
135+
}
136+
lbls[i] = builder.Labels()
137+
}
138+
139+
// Generate another set of series for testing binary expression and vector matching.
140+
for i := numSeries; i < 2*numSeries; i++ {
141+
prompbLabels := []prompb.Label{{Name: "job", Value: "test"}, {Name: "series", Value: strconv.Itoa(i)}}
142+
switch i % 3 {
143+
case 0:
144+
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "200"})
145+
case 1:
146+
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "400"})
147+
default:
148+
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "500"})
149+
}
150+
series := e2e.GenerateSeriesWithSamples("test_series_b", start, scrapeInterval, i*numSamples, numSamples, prompbLabels...)
151+
serieses[i] = series
152+
builder := labels.NewBuilder(labels.EmptyLabels())
153+
for _, lbl := range series.Labels {
154+
builder.Set(lbl.Name, lbl.Value)
155+
}
156+
lbls[i] = builder.Labels()
157+
}
158+
res, err := c1.Push(serieses)
159+
require.NoError(t, err)
160+
require.Equal(t, 200, res.StatusCode)
161+
res, err = c2.Push(serieses)
162+
require.NoError(t, err)
163+
require.Equal(t, 200, res.StatusCode)
164+
165+
waitUntilReady(t, context.Background(), c1, c2, `{job="test"}`, start, end)
166+
167+
rnd := rand.New(rand.NewSource(now.Unix()))
168+
opts := []promqlsmith.Option{
169+
promqlsmith.WithEnableOffset(true),
170+
promqlsmith.WithEnableAtModifier(true),
171+
promqlsmith.WithEnabledFunctions(enabledFunctions),
172+
promqlsmith.WithEnabledAggrs(enabledAggrs),
173+
}
174+
ps := promqlsmith.New(rnd, lbls, opts...)
175+
176+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
177+
}

pkg/api/api.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
2424
"github.com/cortexproject/cortex/pkg/compactor"
2525
"github.com/cortexproject/cortex/pkg/cortexpb"
26+
"github.com/cortexproject/cortex/pkg/distributed_execution"
27+
"github.com/cortexproject/cortex/pkg/distributed_execution/querierpb"
2628
"github.com/cortexproject/cortex/pkg/distributor"
2729
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
2830
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
@@ -482,6 +484,10 @@ func (a *API) RegisterQueryScheduler(f *scheduler.Scheduler) {
482484
schedulerpb.RegisterSchedulerForQuerierServer(a.server.GRPC, f)
483485
}
484486

487+
func (a *API) RegisterQuerierServer(f *distributed_execution.QuerierServer) {
488+
querierpb.RegisterQuerierServer(a.server.GRPC, f)
489+
}
490+
485491
// RegisterServiceMapHandler registers the Cortex structs service handler
486492
// TODO: Refactor this code to be accomplished using the services.ServiceManager
487493
// or a future module manager #2291

pkg/api/handlers.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525
"github.com/weaveworks/common/middleware"
2626

2727
"github.com/cortexproject/cortex/pkg/api/queryapi"
28+
"github.com/cortexproject/cortex/pkg/distributed_execution"
2829
"github.com/cortexproject/cortex/pkg/engine"
2930
"github.com/cortexproject/cortex/pkg/querier"
3031
"github.com/cortexproject/cortex/pkg/querier/codec"
3132
"github.com/cortexproject/cortex/pkg/querier/stats"
33+
"github.com/cortexproject/cortex/pkg/ring/client"
3234
"github.com/cortexproject/cortex/pkg/util"
3335
util_log "github.com/cortexproject/cortex/pkg/util/log"
3436
)
@@ -168,6 +170,9 @@ func NewQuerierHandler(
168170
metadataQuerier querier.MetadataQuerier,
169171
reg prometheus.Registerer,
170172
logger log.Logger,
173+
queryTracker *distributed_execution.QueryTracker,
174+
distributedExecEnabled bool,
175+
querierClientPool *client.Pool,
171176
) http.Handler {
172177
// Prometheus histograms for requests to the querier.
173178
querierRequestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
@@ -284,7 +289,7 @@ func NewQuerierHandler(
284289
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
285290
api.Register(legacyPromRouter)
286291

287-
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
292+
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, queryTracker, distributedExecEnabled, querierClientPool)
288293

289294
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
290295
// https://github.com/prometheus/prometheus/pull/7125/files

pkg/api/handlers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestBuildInfoAPI(t *testing.T) {
235235
version.Version = tc.version
236236
version.Branch = tc.branch
237237
version.Revision = tc.revision
238-
handler := NewQuerierHandler(cfg, querierConfig, nil, nil, nil, nil, nil, &FakeLogger{})
238+
handler := NewQuerierHandler(cfg, querierConfig, nil, nil, nil, nil, nil, &FakeLogger{}, nil, false, nil)
239239
writer := httptest.NewRecorder()
240240
req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil)
241241
req = req.WithContext(user.InjectOrgID(req.Context(), "test"))

0 commit comments

Comments
 (0)