1
1
# -*- coding: utf-8 -*-
2
2
3
3
import pymysql
4
- import pymysql .cursors
5
4
import struct
6
5
7
6
from pymysql .constants .COMMAND import COM_BINLOG_DUMP
7
+ from pymysql .cursors import DictCursor
8
8
from pymysql .util import int2byte
9
9
10
10
from .packet import BinLogPacketWrapper
11
11
from .constants .BINLOG import TABLE_MAP_EVENT , ROTATE_EVENT
12
12
from .gtid import GtidSet
13
- from .event import QueryEvent , RotateEvent , FormatDescriptionEvent , XidEvent , GtidEvent , NotImplementedEvent
14
- from .row_event import UpdateRowsEvent , WriteRowsEvent , DeleteRowsEvent , TableMapEvent
13
+ from .event import (
14
+ QueryEvent , RotateEvent , FormatDescriptionEvent ,
15
+ XidEvent , GtidEvent , NotImplementedEvent )
16
+ from .row_event import (
17
+ UpdateRowsEvent , WriteRowsEvent , DeleteRowsEvent , TableMapEvent )
15
18
16
19
try :
17
20
from pymysql .constants .COMMAND import COM_BINLOG_DUMP_GTID
20
23
# See: https://github.com/PyMySQL/PyMySQL/pull/261
21
24
COM_BINLOG_DUMP_GTID = 0x1e
22
25
23
- MYSQL_EXPECTED_ERROR_CODES = [2013 , 2006 ] #2013 Connection Lost
24
- #2006 MySQL server has gone away
26
+ # 2013 Connection Lost
27
+ # 2006 MySQL server has gone away
28
+ MYSQL_EXPECTED_ERROR_CODES = [2013 , 2006 ]
29
+
25
30
26
31
class BinLogStreamReader (object ):
27
32
"""Connect to replication stream and read event
@@ -31,21 +36,21 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
31
36
blocking = False , only_events = None , log_file = None , log_pos = None ,
32
37
filter_non_implemented_events = True ,
33
38
ignored_events = None , auto_position = None ,
34
- only_tables = None , only_schemas = None ,
35
- freeze_schema = False ):
39
+ only_tables = None , only_schemas = None ,
40
+ freeze_schema = False ):
36
41
"""
37
42
Attributes:
38
43
resume_stream: Start for event from position or the latest event of
39
44
binlog or from older available event
40
45
blocking: Read on stream is blocking
41
46
only_events: Array of allowed events
42
- ignored_events: Array of ignoreded events
47
+ ignored_events: Array of ignored events
43
48
log_file: Set replication start log file
44
49
log_pos: Set replication start log pos
45
50
auto_position: Use master_auto_position gtid to set position
46
51
only_tables: An array with the tables you want to watch
47
52
only_schemas: An array with the schemas you want to watch
48
- freeze_schema: If true do not support ALTER TABLE. It's faster. (default False)
53
+ freeze_schema: If true do not support ALTER TABLE. It's faster.
49
54
"""
50
55
self .__connection_settings = connection_settings
51
56
self .__connection_settings ["charset" ] = "utf8"
@@ -58,16 +63,18 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
58
63
self .__only_tables = only_tables
59
64
self .__only_schemas = only_schemas
60
65
self .__freeze_schema = freeze_schema
61
- self .__allowed_events = self ._allowed_event_list (only_events , ignored_events , filter_non_implemented_events )
66
+ self .__allowed_events = self ._allowed_event_list (
67
+ only_events , ignored_events , filter_non_implemented_events )
62
68
63
- # We can't filter on packet level TABLE_MAP and rotate event because we need
64
- # them for handling other operations
65
- self .__allowed_events_in_packet = frozenset ([TableMapEvent , RotateEvent ]).union (self .__allowed_events )
69
+ # We can't filter on packet level TABLE_MAP and rotate event because
70
+ # we need them for handling other operations
71
+ self .__allowed_events_in_packet = frozenset (
72
+ [TableMapEvent , RotateEvent ]).union (self .__allowed_events )
66
73
67
74
self .__server_id = server_id
68
75
self .__use_checksum = False
69
76
70
- #Store table meta information
77
+ # Store table meta information
71
78
self .table_map = {}
72
79
self .log_pos = log_pos
73
80
self .log_file = log_file
@@ -84,14 +91,13 @@ def close(self):
84
91
def __connect_to_ctl (self ):
85
92
self ._ctl_connection_settings = dict (self .__connection_settings )
86
93
self ._ctl_connection_settings ["db" ] = "information_schema"
87
- self ._ctl_connection_settings ["cursorclass" ] = \
88
- pymysql .cursors .DictCursor
94
+ self ._ctl_connection_settings ["cursorclass" ] = DictCursor
89
95
self ._ctl_connection = pymysql .connect (** self ._ctl_connection_settings )
90
96
self ._ctl_connection ._get_table_information = self .__get_table_information
91
97
self .__connected_ctl = True
92
98
93
99
def __checksum_enabled (self ):
94
- ''' Return True if binlog-checksum = CRC32. Only for MySQL > 5.6 '''
100
+ """ Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
95
101
cur = self ._stream_connection .cursor ()
96
102
cur .execute ("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'" )
97
103
result = cur .fetchone ()
@@ -113,7 +119,8 @@ def __connect_to_stream(self):
113
119
114
120
self .__use_checksum = self .__checksum_enabled ()
115
121
116
- #If cheksum is enabled we need to inform the server about the that we support it
122
+ # If checksum is enabled we need to inform the server about the that
123
+ # we support it
117
124
if self .__use_checksum :
118
125
cur = self ._stream_connection .cursor ()
119
126
cur .execute ("set @master_binlog_checksum= @@global.binlog_checksum" )
@@ -158,8 +165,9 @@ def __connect_to_stream(self):
158
165
# Zeroified
159
166
# binlog position uint 4bytes == 4
160
167
# payload_size uint 4bytes
161
- ## What come next, is the payload, where the slave gtid_executed
162
- ## is sent to the master
168
+
169
+ # What come next, is the payload, where the slave gtid_executed
170
+ # is sent to the master
163
171
# n_sid ulong 8bytes == which size is the gtid_set
164
172
# | sid uuid 16bytes UUID as a binary
165
173
# | n_intervals ulong 8bytes == how many intervals are sent for this gtid
@@ -179,17 +187,16 @@ def __connect_to_stream(self):
179
187
gtid_set = GtidSet (self .auto_position )
180
188
encoded_data_size = gtid_set .encoded_length
181
189
182
- header_size = (2 + # binlog_flags
183
- 4 + # server_id
184
- 4 + # binlog_name_info_size
185
- 4 + # empty binlog name
186
- 8 + # binlog_pos_info_size
187
- 4 ) # encoded_data_size
190
+ header_size = (2 + # binlog_flags
191
+ 4 + # server_id
192
+ 4 + # binlog_name_info_size
193
+ 4 + # empty binlog name
194
+ 8 + # binlog_pos_info_size
195
+ 4 ) # encoded_data_size
188
196
189
197
prelude = b'' + struct .pack ('<i' , header_size + encoded_data_size ) \
190
198
+ int2byte (COM_BINLOG_DUMP_GTID )
191
199
192
-
193
200
# binlog_flags = 0 (2 bytes)
194
201
prelude += struct .pack ('<H' , 0 )
195
202
# server_id (4 bytes)
@@ -206,7 +213,6 @@ def __connect_to_stream(self):
206
213
# encoded_data
207
214
prelude += gtid_set .encoded ()
208
215
209
-
210
216
if pymysql .__version__ < "0.6" :
211
217
self ._stream_connection .wfile .write (prelude )
212
218
self ._stream_connection .wfile .flush ()
@@ -247,7 +253,8 @@ def fetchone(self):
247
253
self .__only_schemas ,
248
254
self .__freeze_schema )
249
255
250
- if binlog_event .event_type == TABLE_MAP_EVENT and binlog_event .event is not None :
256
+ if binlog_event .event_type == TABLE_MAP_EVENT and \
257
+ binlog_event .event is not None :
251
258
self .table_map [binlog_event .event .table_id ] = \
252
259
binlog_event .event .get_table ()
253
260
@@ -274,7 +281,8 @@ def fetchone(self):
274
281
275
282
return binlog_event .event
276
283
277
- def _allowed_event_list (self , only_events , ignored_events , filter_non_implemented_events ):
284
+ def _allowed_event_list (self , only_events , ignored_events ,
285
+ filter_non_implemented_events ):
278
286
if only_events is not None :
279
287
events = set (only_events )
280
288
else :
0 commit comments