|
44 | 44 | #include <boost/algorithm/string.hpp>
|
45 | 45 | #include <regex>
|
46 | 46 |
|
| 47 | +#include <graphene/utilities/elasticsearch.hpp> |
| 48 | + |
47 | 49 | namespace graphene { namespace elasticsearch {
|
48 | 50 |
|
49 | 51 | namespace detail
|
@@ -74,10 +76,9 @@ class elasticsearch_plugin_impl
|
74 | 76 | bool _elasticsearch_visitor = false;
|
75 | 77 | CURL *curl; // curl handler
|
76 | 78 | vector <string> bulk; // vector of op lines
|
77 |
| - private: |
| 79 | + vector<std::string> prepare; |
| 80 | +private: |
78 | 81 | void add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho, const signed_block& b );
|
79 |
| - void createBulkLine(account_transaction_history_object ath, operation_history_struct os, int op_type, block_struct bs, visitor_struct vs); |
80 |
| - void sendBulk(std::string _elasticsearch_node_url, bool _elasticsearch_logs); |
81 | 82 |
|
82 | 83 | };
|
83 | 84 |
|
@@ -211,10 +212,28 @@ void elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account
|
211 | 212 | else
|
212 | 213 | limit_documents = _elasticsearch_bulk_replay;
|
213 | 214 |
|
214 |
| - createBulkLine(ath, os, op_type, bs, vs); // we have everything, creating bulk line |
| 215 | + // we have everything, creating bulk line |
| 216 | + bulk_struct bulks; |
| 217 | + bulks.account_history = ath; |
| 218 | + bulks.operation_history = os; |
| 219 | + bulks.operation_type = op_type; |
| 220 | + bulks.block_data = bs; |
| 221 | + bulks.additional_data = vs; |
| 222 | + |
| 223 | + std::string data = fc::json::to_string(bulks); |
| 224 | + |
| 225 | + auto block_date = bulks.block_data.block_time.to_iso_string(); |
| 226 | + std::vector<std::string> parts; |
| 227 | + boost::split(parts, block_date, boost::is_any_of("-")); |
| 228 | + std::string index_name = "graphene-" + parts[0] + "-" + parts[1]; // index name |
| 229 | + std::string _id = fc::json::to_string(ath.id); |
| 230 | + |
| 231 | + prepare = graphene::utilities::createBulk(index_name, data, _id, 0); |
| 232 | + bulk.insert(bulk.end(), prepare.begin(), prepare.end()); |
215 | 233 |
|
216 | 234 | if (curl && bulk.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
|
217 |
| - sendBulk(_elasticsearch_node_url, _elasticsearch_logs); |
| 235 | + prepare.clear(); |
| 236 | + graphene::utilities::SendBulk(curl, bulk, _elasticsearch_node_url, _elasticsearch_logs, "logs-account-history"); |
218 | 237 | }
|
219 | 238 |
|
220 | 239 | // remove everything except current object from ath
|
@@ -243,97 +262,6 @@ void elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account
|
243 | 262 | }
|
244 | 263 | }
|
245 | 264 |
|
246 |
| -void elasticsearch_plugin_impl::createBulkLine(account_transaction_history_object ath, operation_history_struct os, int op_type, block_struct bs, visitor_struct vs) |
247 |
| -{ |
248 |
| - bulk_struct bulks; |
249 |
| - bulks.account_history = ath; |
250 |
| - bulks.operation_history = os; |
251 |
| - bulks.operation_type = op_type; |
252 |
| - bulks.block_data = bs; |
253 |
| - bulks.additional_data = vs; |
254 |
| - |
255 |
| - std::string alltogether = fc::json::to_string(bulks); |
256 |
| - |
257 |
| - auto block_date = bulks.block_data.block_time.to_iso_string(); |
258 |
| - std::vector<std::string> parts; |
259 |
| - boost::split(parts, block_date, boost::is_any_of("-")); |
260 |
| - std::string index_name = "graphene-" + parts[0] + "-" + parts[1]; |
261 |
| - |
262 |
| - // bulk header before each line, op_type = create to avoid dups, index id will be ath id(2.9.X). |
263 |
| - std::string _id = fc::json::to_string(ath.id); |
264 |
| - bulk.push_back("{ \"index\" : { \"_index\" : \""+index_name+"\", \"_type\" : \"data\", \"op_type\" : \"create\", \"_id\" : "+_id+" } }"); // header |
265 |
| - bulk.push_back(alltogether); |
266 |
| -} |
267 |
| - |
268 |
| -void elasticsearch_plugin_impl::sendBulk(std::string _elasticsearch_node_url, bool _elasticsearch_logs) |
269 |
| -{ |
270 |
| - |
271 |
| - // curl buffers to read |
272 |
| - std::string readBuffer; |
273 |
| - std::string readBuffer_logs; |
274 |
| - |
275 |
| - std::string bulking = ""; |
276 |
| - |
277 |
| - bulking = boost::algorithm::join(bulk, "\n"); |
278 |
| - bulking = bulking + "\n"; |
279 |
| - bulk.clear(); |
280 |
| - |
281 |
| - //wlog((bulking)); |
282 |
| - |
283 |
| - struct curl_slist *headers = NULL; |
284 |
| - headers = curl_slist_append(headers, "Content-Type: application/json"); |
285 |
| - std::string url = _elasticsearch_node_url + "_bulk"; |
286 |
| - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); |
287 |
| - curl_easy_setopt(curl, CURLOPT_POST, true); |
288 |
| - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); |
289 |
| - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, bulking.c_str()); |
290 |
| - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); |
291 |
| - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&readBuffer); |
292 |
| - curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); |
293 |
| - //curl_easy_setopt(curl, CURLOPT_VERBOSE, true); |
294 |
| - curl_easy_perform(curl); |
295 |
| - |
296 |
| - long http_code = 0; |
297 |
| - curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); |
298 |
| - if(http_code == 200) { |
299 |
| - // all good, do nothing |
300 |
| - } |
301 |
| - else if(http_code == 429) { |
302 |
| - // repeat request? |
303 |
| - } |
304 |
| - else { |
305 |
| - // exit everything ? |
306 |
| - } |
307 |
| - |
308 |
| - if(_elasticsearch_logs) { |
309 |
| - auto logs = readBuffer; |
310 |
| - // do logs |
311 |
| - std::string url_logs = _elasticsearch_node_url + "logs/data/"; |
312 |
| - curl_easy_setopt(curl, CURLOPT_URL, url_logs.c_str()); |
313 |
| - curl_easy_setopt(curl, CURLOPT_POST, true); |
314 |
| - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); |
315 |
| - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, logs.c_str()); |
316 |
| - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); |
317 |
| - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &readBuffer_logs); |
318 |
| - curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcrp/0.1"); |
319 |
| - //curl_easy_setopt(curl, CURLOPT_VERBOSE, true); |
320 |
| - //ilog("log here curl: ${output}", ("output", readBuffer_logs)); |
321 |
| - curl_easy_perform(curl); |
322 |
| - |
323 |
| - http_code = 0; |
324 |
| - curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); |
325 |
| - if(http_code == 200) { |
326 |
| - // all good, do nothing |
327 |
| - } |
328 |
| - else if(http_code == 429) { |
329 |
| - // repeat request? |
330 |
| - } |
331 |
| - else { |
332 |
| - // exit everything ? |
333 |
| - } |
334 |
| - } |
335 |
| -} |
336 |
| - |
337 | 265 | } // end namespace detail
|
338 | 266 |
|
339 | 267 | elasticsearch_plugin::elasticsearch_plugin() :
|
@@ -394,6 +322,9 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
|
394 | 322 |
|
395 | 323 | void elasticsearch_plugin::plugin_startup()
|
396 | 324 | {
|
| 325 | + if(!graphene::utilities::checkES(my->curl, my->_elasticsearch_node_url)) |
| 326 | + FC_THROW_EXCEPTION(fc::exception, "ES database is not up in url ${url}", ("url", my->_elasticsearch_node_url)); |
| 327 | + ilog("elasticsearch ACCOUNT HISTORY: plugin_startup() begin"); |
397 | 328 | }
|
398 | 329 |
|
399 | 330 | } }
|
0 commit comments