-
Notifications
You must be signed in to change notification settings - Fork 173
/
Copy pathquery.h
223 lines (177 loc) · 5.84 KB
/
query.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
#pragma once
#include "block.h"
#include "server_exception.h"
#include "base/open_telemetry.h"
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
namespace clickhouse {
struct QuerySettingsField {
enum Flags : uint64_t
{
IMPORTANT = 0x01,
CUSTOM = 0x02,
};
std::string value;
uint64_t flags{0};
};
using QuerySettings = std::unordered_map<std::string, QuerySettingsField>;
struct Profile {
uint64_t rows = 0;
uint64_t blocks = 0;
uint64_t bytes = 0;
uint64_t rows_before_limit = 0;
bool applied_limit = false;
bool calculated_rows_before_limit = false;
};
struct Progress {
uint64_t rows = 0;
uint64_t bytes = 0;
uint64_t total_rows = 0;
uint64_t written_rows = 0;
uint64_t written_bytes = 0;
};
class QueryEvents {
public:
virtual ~QueryEvents()
{ }
/// Some data was received.
virtual void OnData(const Block& block) = 0;
virtual bool OnDataCancelable(const Block& block) = 0;
virtual void OnServerException(const Exception& e) = 0;
virtual void OnProfile(const Profile& profile) = 0;
virtual void OnProgress(const Progress& progress) = 0;
/** Handle query execution logs provided by server.
* Amount of logs regulated by `send_logs_level` setting.
* By-default only `fatal` log events are sent to the client side.
*/
virtual void OnServerLog(const Block& block) = 0;
/// Handle query execution profile events.
virtual void OnProfileEvents(const Block& block) = 0;
virtual void OnFinish() = 0;
};
using ExceptionCallback = std::function<void(const Exception& e)>;
using ProgressCallback = std::function<void(const Progress& progress)>;
using SelectCallback = std::function<void(const Block& block)>;
using SelectCancelableCallback = std::function<bool(const Block& block)>;
using SelectServerLogCallback = std::function<bool(const Block& block)>;
using ProfileEventsCallback = std::function<bool(const Block& block)>;
class Query : public QueryEvents {
public:
Query();
Query(const char* query, const char* query_id = nullptr);
Query(const std::string& query, const std::string& query_id = default_query_id);
Query(std::string&& query, std::string&& query_id = std::string(default_query_id));
~Query() override;
///
inline const std::string& GetText() const {
return query_;
}
inline const std::string& GetQueryID() const {
return query_id_;
}
inline const QuerySettings& GetQuerySettings() const {
return query_settings_;
}
/// Set per query settings
inline Query& SetQuerySettings(QuerySettings query_settings) {
query_settings_ = std::move(query_settings);
return *this;
}
/// Set per query setting
inline Query& SetSetting(const std::string& key, const QuerySettingsField& value) {
query_settings_[key] = value;
return *this;
}
inline const std::optional<open_telemetry::TracingContext>& GetTracingContext() const {
return tracing_context_;
}
/// Set tracing context for open telemetry signals
inline Query& SetTracingContext(open_telemetry::TracingContext tracing_context) {
tracing_context_ = std::move(tracing_context);
return *this;
}
/// Set handler for receiving result data.
inline Query& OnData(SelectCallback cb) {
select_cb_ = std::move(cb);
return *this;
}
inline Query& OnDataCancelable(SelectCancelableCallback cb) {
select_cancelable_cb_ = std::move(cb);
return *this;
}
/// Set handler for receiving server's exception.
inline Query& OnException(ExceptionCallback cb) {
exception_cb_ = std::move(cb);
return *this;
}
/// Set handler for receiving a progress of query execution.
inline Query& OnProgress(ProgressCallback cb) {
progress_cb_ = std::move(cb);
return *this;
}
/// Set handler for receiving a server log of query exceution.
inline Query& OnServerLog(SelectServerLogCallback cb) {
select_server_log_cb_ = std::move(cb);
return *this;
}
/// Set handler for receiving profile events.
inline Query& OnProfileEvents(ProfileEventsCallback cb) {
profile_events_callback_cb_ = std::move(cb);
return *this;
}
static const std::string default_query_id;
private:
void OnData(const Block& block) override {
if (select_cb_) {
select_cb_(block);
}
}
bool OnDataCancelable(const Block& block) override {
if (select_cancelable_cb_) {
return select_cancelable_cb_(block);
} else {
return true;
}
}
void OnServerException(const Exception& e) override {
if (exception_cb_) {
exception_cb_(e);
}
}
void OnProfile(const Profile& profile) override {
(void)profile;
}
void OnProgress(const Progress& progress) override {
if (progress_cb_) {
progress_cb_(progress);
}
}
void OnServerLog(const Block& block) override {
if (select_server_log_cb_) {
select_server_log_cb_(block);
}
}
void OnProfileEvents(const Block& block) override {
if (profile_events_callback_cb_) {
profile_events_callback_cb_(block);
}
}
void OnFinish() override {
}
private:
const std::string query_;
const std::string query_id_;
std::optional<open_telemetry::TracingContext> tracing_context_;
QuerySettings query_settings_;
ExceptionCallback exception_cb_;
ProgressCallback progress_cb_;
SelectCallback select_cb_;
SelectCancelableCallback select_cancelable_cb_;
SelectServerLogCallback select_server_log_cb_;
ProfileEventsCallback profile_events_callback_cb_;
};
}