1+ #include " GTID_Server_Data.h"
12#include " MySQL_HostGroups_Manager.h"
23
34#include " ev.h"
@@ -47,21 +48,14 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
4748 bool rc = true ;
4849 rc = sd->readall ();
4950 if (rc == false ) {
50- // delete sd;
51- std::string s1 = sd->address ;
52- s1.append (" :" );
53- s1.append (std::to_string (sd->mysql_port ));
5451 MyHGM->gtid_missing_nodes = true ;
55- proxy_warning (" GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n " , sd->port , sd->address , sd->mysql_port );
56- std::unordered_map <string, GTID_Server_Data *>::iterator it2;
57- it2 = MyHGM->gtid_map .find (s1);
58- if (it2 != MyHGM->gtid_map .end ()) {
59- // MyHGM->gtid_map.erase(it2);
60- it2->second = NULL ;
61- delete sd;
62- }
52+ sd->active = false ;
53+ proxy_warning (" GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n " , sd->port , sd->address , sd->mysql_port );
54+
6355 ev_io_stop (MyHGM->gtid_ev_loop , w);
56+ close (w->fd );
6457 free (w);
58+ sd->w = nullptr ;
6559 } else {
6660 sd->dump ();
6761 }
@@ -71,48 +65,37 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
7165
7266void connect_cb (EV_P_ ev_io *w, int revents) {
7367 pthread_mutex_lock (&ev_loop_mutex);
74- struct ev_io * c = w;
7568 if (revents & EV_WRITE) {
76- int optval = 0 ;
77- socklen_t optlen = sizeof (optval);
78- if ((getsockopt (w->fd , SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 ) ||
79- (optval != 0 )) {
80- /* Connection failed; try the next address in the list. */
81- // int errnum = optval ? optval : errno;
82- ev_io_stop (MyHGM->gtid_ev_loop , w);
83- close (w->fd );
69+ int fd = w->fd ;
70+ GTID_Server_Data *sd = (GTID_Server_Data *)w->data ;
71+
72+ // connect() completed, this watcher is no longer needed
73+ ev_io_stop (MyHGM->gtid_ev_loop , w);
74+ free (w);
75+ sd->w = nullptr ;
76+
77+ // Based on fd status, proceed to next step -> waiting for read event on the socket
78+ int error = 0 ;
79+ socklen_t optlen = sizeof (error);
80+ int rc = getsockopt (fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
81+ if (rc == -1 || error != 0 ) {
82+ /* connection failed */
8483 MyHGM->gtid_missing_nodes = true ;
85- GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data ;
86- GTID_Server_Data *sd = custom_data;
87- std::string s1 = sd->address ;
88- s1.append (" :" );
89- s1.append (std::to_string (sd->mysql_port ));
84+ sd->active = false ;
9085 proxy_warning (" GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n " , sd->port , sd->address , sd->mysql_port );
91- std::unordered_map <string, GTID_Server_Data *>::iterator it2;
92- it2 = MyHGM->gtid_map .find (s1);
93- if (it2 != MyHGM->gtid_map .end ()) {
94- // MyHGM->gtid_map.erase(it2);
95- it2->second = NULL ;
96- delete sd;
97- }
98- // delete custom_data;
99- free (c);
86+ close (fd);
10087 } else {
101- ev_io_stop (MyHGM->gtid_ev_loop , w);
102- int fd=w->fd ;
103- struct ev_io * new_w = (struct ev_io *) malloc (sizeof (struct ev_io ));
104- new_w->data = w->data ;
105- GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data ;
106- custom_data->w = new_w;
107- free (w);
108- ev_io_init (new_w, reader_cb, fd, EV_READ);
109- ev_io_start (MyHGM->gtid_ev_loop , new_w);
88+ struct ev_io *read_watcher = (struct ev_io *) malloc (sizeof (struct ev_io ));
89+ read_watcher->data = sd;
90+ sd->w = read_watcher;
91+ ev_io_init (read_watcher, reader_cb, fd, EV_READ);
92+ ev_io_start (MyHGM->gtid_ev_loop , read_watcher);
11093 }
11194 }
11295 pthread_mutex_unlock (&ev_loop_mutex);
11396}
11497
115- struct ev_io * new_connector (char *address, uint16_t gtid_port, uint16_t mysql_port) {
98+ struct ev_io * new_connect_watcher (char *address, uint16_t gtid_port, uint16_t mysql_port) {
11699 int s;
117100
118101 if ((s = socket (AF_INET, SOCK_STREAM, 0 )) == -1 ) {
@@ -149,17 +132,13 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
149132 struct ev_io *c = (struct ev_io *)malloc (sizeof (struct ev_io ));
150133 if (c) {
151134 ev_io_init (c, connect_cb, s, EV_WRITE);
152- GTID_Server_Data * custom_data = new GTID_Server_Data (c, address, gtid_port, mysql_port);
153- c->data = (void *)custom_data;
154135 return c;
155136 }
156137 /* else error */
157138 }
158139 return NULL ;
159140}
160141
161-
162-
163142GTID_Server_Data::GTID_Server_Data (struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
164143 active = true ;
165144 w = _w;
@@ -188,26 +167,32 @@ GTID_Server_Data::~GTID_Server_Data() {
188167}
189168
190169bool GTID_Server_Data::readall () {
191- bool ret = true ;
192170 if (size == len) {
193171 // buffer is full, expand
194- resize (len* 2 );
172+ resize (len * 2 );
195173 }
174+
196175 int rc = 0 ;
197- rc = read (w->fd ,data+len,size-len);
176+ rc = read (w->fd , data+len, size-len);
198177 if (rc > 0 ) {
199178 len += rc;
179+ return true ;
180+ }
181+
182+ int myerr = errno;
183+ if (rc == 0 ) {
184+ proxy_info (" Read returned EOF\n " );
185+ return false ;
186+ }
187+
188+ // rc == -1
189+ proxy_error (" Read failed, error %d\n " , myerr);
190+ if (myerr == EINTR || myerr == EAGAIN) {
191+ // non-blocking fd, so this should not be considered as an error
192+ return true ;
200193 } else {
201- int myerr = errno;
202- proxy_error (" Read returned %d bytes, error %d\n " , rc, myerr);
203- if (
204- (rc == 0 ) ||
205- (rc==-1 && myerr != EINTR && myerr != EAGAIN)
206- ) {
207- ret = false ;
208- }
194+ return false ;
209195 }
210- return ret;
211196}
212197
213198
@@ -239,9 +224,6 @@ void GTID_Server_Data::dump() {
239224 return ;
240225 }
241226 read_all_gtids ();
242- // int rc = write(1,data+pos,len-pos);
243- fflush (stdout);
244- // /pos += rc;
245227 if (pos >= len/2 ) {
246228 memmove (data,data+pos,len-pos);
247229 len = len-pos;
@@ -285,13 +267,12 @@ bool GTID_Server_Data::read_next_gtid() {
285267 bs[l-3 ] = ' \0 ' ;
286268 char *saveptr1=NULL ;
287269 char *saveptr2=NULL ;
288- // char *saveptr3=NULL;
289270 char *token = NULL ;
290271 char *subtoken = NULL ;
291- // char *subtoken2 = NULL;
292272 char *str1 = NULL ;
293273 char *str2 = NULL ;
294- // char *str3 = NULL;
274+ bool updated = false ;
275+
295276 for (str1 = bs; ; str1 = NULL ) {
296277 token = strtok_r (str1, " ," , &saveptr1);
297278 if (token == NULL ) {
@@ -312,56 +293,48 @@ bool GTID_Server_Data::read_next_gtid() {
312293 p++;
313294 }
314295 }
315- // fprintf(stdout,"BS from %s\n", uuid_server);
316296 } else { // we are reading the trxids
317297 uint64_t trx_from;
318298 uint64_t trx_to;
319299 sscanf (subtoken," %lu-%lu" ,&trx_from,&trx_to);
320- // fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
321- std::string s = uuid_server;
322- gtid_executed[s].emplace_back (trx_from, trx_to);
300+ updated = addGtidInterval (gtid_executed, uuid_server, trx_from, trx_to) || updated;
323301 }
324302 }
325303 }
326304 pos += l+1 ;
327305 free (bs);
328- // return true;
306+
307+ if (updated) {
308+ events_read++;
309+ }
329310 } else {
330311 strncpy (rec_msg,data+pos,l);
331312 pos += l+1 ;
332313 rec_msg[l] = 0 ;
333- // int rc = write(1,data+pos,l+1);
334- // fprintf(stdout,"%s\n", rec_msg);
335314 if (rec_msg[0 ]==' I' ) {
336- // char rec_uuid[80];
337315 uint64_t rec_trxid = 0 ;
338316 char *a = NULL ;
339317 int ul = 0 ;
340318 switch (rec_msg[1 ]) {
341319 case ' 1' :
342- // sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
343320 a = strchr (rec_msg+3 ,' :' );
344321 ul = a-rec_msg-3 ;
345322 strncpy (uuid_server,rec_msg+3 ,ul);
346323 uuid_server[ul] = 0 ;
347324 rec_trxid=atoll (a+1 );
348325 break ;
349326 case ' 2' :
350- // sscanf(rec_msg+3,"%lu",&rec_trxid);
351327 rec_trxid=atoll (rec_msg+3 );
352328 break ;
353329 default :
354330 break ;
355331 }
356- // fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
357332 std::string s = uuid_server;
358333 gtid_t new_gtid = std::make_pair (s,rec_trxid);
359334 addGtid (new_gtid,gtid_executed);
360335 events_read++;
361- // return true;
362336 }
363337 }
364- // std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
365338 return true ;
366339}
367340
@@ -439,6 +412,43 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
439412 }
440413}
441414
415+ bool addGtidInterval (gtid_set_t & gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
416+ bool updated = true ;
417+
418+ auto it = gtid_executed.find (server_uuid);
419+ if (it == gtid_executed.end ()) {
420+ gtid_executed[server_uuid].emplace_back (txid_start, txid_end);
421+ return updated;
422+ }
423+
424+ bool insert = true ;
425+
426+ // When ProxySQL reconnects with binlog reader, it might
427+ // receive updated txid intervals in the bootstrap message.
428+ // For example,
429+ // before disconnection -> server_UUID:1-10
430+ // after reconnection -> server_UUID:1-19
431+ auto &txid_intervals = it->second ;
432+ for (auto &interval : txid_intervals) {
433+ if (interval.first == txid_start) {
434+ if (interval.second == txid_end) {
435+ updated = false ;
436+ } else {
437+ interval.second = txid_end;
438+ }
439+ insert = false ;
440+ break ;
441+ }
442+ }
443+
444+ if (insert) {
445+ txid_intervals.emplace_back (txid_start, txid_end);
446+
447+ }
448+
449+ return updated;
450+ }
451+
442452void * GTID_syncer_run () {
443453 // struct ev_loop * gtid_ev_loop;
444454 // gtid_ev_loop = NULL;
0 commit comments