Skip to content

Commit

Permalink
Wal2json format v2 (#91)
Browse files Browse the repository at this point in the history
* wip - using wal2json format version 2

* Removing debug comments and ipdb's

* Pylint

* Catch DataError on trying format v2

* Try to read a message to check version 2 support

* Fix small logic issue

* Set message_format to switch implementation

* Add sleep to test theory

* Saving progress on backoff pattern, reverting next

* Move to conn_info message_format

* Make config parameter more explicit

* move tests into tap-postgres repo

* Fix unittests imports

* Move tests to integration subdirectory

* Revert previous commit and fix test discovery issue

* Change to correct integration tests path

* Clean up comments, add explicit connection property for wal2json, copy test for wal2json v2

* Change new test name to match canonicalized version

* Add log line when using format-version 2

* Pull new message-format out of config

* Move to docker postgres host

* Fix tests, bugs and incorrect expectations

* Ensure DB exists for unittests

* Review comments

* Remove .sample and pylint

Co-authored-by: Kyle Allan <[email protected]>
  • Loading branch information
dmosorast and Kyle Allan authored Jun 30, 2020
1 parent f4dab2d commit f8cd6b5
Show file tree
Hide file tree
Showing 26 changed files with 5,109 additions and 73 deletions.
16 changes: 16 additions & 0 deletions .circleci/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM postgres:9.6

# Git SHA of v2.2
ENV WAL2JSON_COMMIT_ID=9f9762315062888f7f7f4f0a115073a33ad1275e

# Compile the plugins from sources and install
RUN apt-get update && apt-get install -y postgresql-server-dev-9.6 gcc git make pkgconf \
&& git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& (cd /wal2json && git checkout $WAL2JSON_COMMIT_ID && make && make install) \
&& rm -rf wal2json

# Copy the custom configuration which will be passed down to the server
COPY postgresql.conf /usr/local/share/postgresql/postgresql.conf

# Copy the script which will initialize the replication permissions
COPY /docker-entrypoint-initdb.d /docker-entrypoint-initdb.d
27 changes: 17 additions & 10 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: singerio/postgres:9.6-wal2json-2.2
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
command: [postgres, -c, config_file=/usr/local/share/postgresql/postgresql.conf]
steps:
- checkout
- run:
Expand All @@ -25,18 +30,19 @@ jobs:
command: |
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-a-test --tap=tap-postgres \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tap_tester.suites.postgres
run-test --tap=tap-postgres \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
workflows:
version: 2
commit:
jobs:
- build
- build:
context: circleci-user
build_daily:
triggers:
- schedule:
Expand All @@ -46,4 +52,5 @@ workflows:
only:
- master
jobs:
- build
- build:
context: circleci-user
4 changes: 4 additions & 0 deletions .circleci/docker-entrypoint-initdb.d/init-permissions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

{ echo "host replication $POSTGRES_USER 0.0.0.0/0 trust"; } >> "$PGDATA/pg_hba.conf"
15 changes: 15 additions & 0 deletions .circleci/postgresql.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# LOGGING
log_min_error_statement = fatal

# CONNECTION
listen_addresses = '*'

# MODULES
#shared_preload_libraries = 'decoderbufs'

# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 5 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 5 # max number of replication slots (change requires restart)
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
test:
nosetests -v
nosetests -v tests/unittests
85 changes: 85 additions & 0 deletions bin/test-db
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
import os
import sys
import argparse
import subprocess
import time
from argparse import RawTextHelpFormatter

full_image_name = "singerio/postgres:9.6-wal2json-2.2"

def start_container(name):
START_COMMAND = """
sudo docker run -e "POSTGRES_USER={0}" -e "POSTGRES_PASSWORD={1}" -p {2}:{2} --name {3} -d {4} \
postgres -c config_file=/usr/local/share/postgresql/postgresql.conf
""".format(os.getenv('TAP_POSTGRES_USER'),
os.getenv('TAP_POSTGRES_PASSWORD'),
5432,
name,
full_image_name)

print("Starting Docker process {} using command: {}".format(name, START_COMMAND))

proc = subprocess.run(START_COMMAND, shell=True)
if proc.returncode != 0:
sys.exit("Exited with code: {}, the docker process failed to start.".format(proc.returncode))
print("Process started successfully.")

def get_ip_addr(name):
IP_ADDR_COMMAND = "docker inspect {} | jq -r .[].NetworkSettings.IPAddress"
print("Retrieving IP addr of postgres container")
ip_addr = subprocess.check_output(IP_ADDR_COMMAND.format(name), shell=True).decode('utf-8').rstrip()
print(ip_addr)
return ip_addr


def stop_container(name):
STOP_COMMAND = "sudo docker stop {0} && sudo docker rm {0}"

print("Stopping Docker process {}".format(name))
proc = subprocess.run(STOP_COMMAND.format(name), shell=True)
if proc.returncode != 0:
sys.exit("Exited with code: {}, the docker process failed to stop.".format(proc.returncode))
print("Process stopped successfully")

def connect_to_db(name):
CONNECT_COMMAND = 'docker run -it --rm -e "PGPASSWORD={}" {} psql --host {} -U {}'

ip_addr = get_ip_addr(name)

print("Attempting to connect to running container using a postgres container via psql")
connect_command_format = CONNECT_COMMAND.format(os.getenv('TAP_POSTGRES_PASSWORD'),
full_image_name,
ip_addr,
os.getenv('TAP_POSTGRES_USER'))
print(connect_command_format)
# NB: Using call instead of run here because it is blocking
# This returns only an exit code.
returncode = subprocess.call(connect_command_format,
shell=True)
if returncode != 0:
sys.exit("Exited with code: {}, could not connect.".format(returncode))

DESCRIPTION = """
Manage docker instance for tap-postgres testing.
Uses environment variables:
TAP_POSTGRES_USER
TAP_POSTGRES_PASSWORD
"""
parser = argparse.ArgumentParser(description=DESCRIPTION, formatter_class=RawTextHelpFormatter)
parser.add_argument('action', choices=['start','stop', 'connect'], help='action to perform with the container')
parser.add_argument('--name', help="name assigned to running docker process", default='postgres1')

def main():
parsed_args = parser.parse_args()
# Potential arguments to add: pull, changing docker cointainer, changing password
if parsed_args.action == 'start':
start_container(parsed_args.name)
elif parsed_args.action == 'stop':
stop_container(parsed_args.name)
elif parsed_args.action == 'connect':
connect_to_db(parsed_args.name)

if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,8 @@ def main_impl():
'dbname' : args.config['dbname'],
'filter_dbs' : args.config.get('filter_dbs'),
'debug_lsn' : args.config.get('debug_lsn') == 'true',
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0))}
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)),
'wal2json_message_format': args.config.get('wal2json_message_format')}

if args.config.get('ssl') == 'true':
conn_config['sslmode'] = 'require'
Expand Down
103 changes: 92 additions & 11 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,68 @@ def row_to_singer_message(stream, row, version, columns, time_extracted, md_map,
version=version,
time_extracted=time_extracted)

def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):
payload = json.loads(msg.payload)
lsn = msg.data_start
def consume_message_format_2(payload, conn_info, streams_lookup, state, time_extracted, lsn):
## Action Types:
# I = Insert
# U = Update
# D = Delete
# B = Begin Transaction
# C = Commit Transaction
# M = Message
# T = Truncate
action = payload['action']
if action not in ['U', 'I', 'D']:
LOGGER.debug("Skipping message of type %s", action)
yield None
else:
tap_stream_id = post_db.compute_tap_stream_id(conn_info['dbname'], payload['schema'], payload['table'])
if streams_lookup.get(tap_stream_id) is None:
yield None
else:
target_stream = streams_lookup[tap_stream_id]
stream_version = get_stream_version(target_stream['tap_stream_id'], state)
stream_md_map = metadata.to_map(target_stream['metadata'])

streams_lookup = {}
for s in streams:
streams_lookup[s['tap_stream_id']] = s
desired_columns = [col for col in target_stream['schema']['properties'].keys() if sync_common.should_sync_column(stream_md_map, col)]

col_names = []
col_vals = []
if payload['action'] in ['I', 'U']:
for column in payload['columns']:
if column['name'] in set(desired_columns):
col_names.append(column['name'])
col_vals.append(column['value'])

col_names = col_names + ['_sdc_deleted_at']
col_vals = col_vals + [None]

if conn_info.get('debug_lsn'):
col_names = col_names + ['_sdc_lsn']
col_vals = col_vals + [str(lsn)]

elif payload['action'] == 'D':
for column in payload['identity']:
if column['name'] in set(desired_columns):
col_names.append(column['name'])
col_vals.append(column['value'])

col_names = col_names + ['_sdc_deleted_at']
col_vals = col_vals + [singer.utils.strftime(singer.utils.strptime_to_utc(payload['timestamp']))]

if conn_info.get('debug_lsn'):
col_vals = col_vals + [str(lsn)]
col_names = col_names + ['_sdc_lsn']

# Yield 1 record to match the API of V1
yield row_to_singer_message(target_stream, col_vals, stream_version, col_names, time_extracted, stream_md_map, conn_info)

state = singer.write_bookmark(state,
target_stream['tap_stream_id'],
'lsn',
lsn)

# message-format v1
def consume_message_format_1(payload, conn_info, streams_lookup, state, time_extracted, lsn):
for c in payload['change']:
tap_stream_id = post_db.compute_tap_stream_id(conn_info['dbname'], c['schema'], c['table'])
if streams_lookup.get(tap_stream_id) is None:
Expand Down Expand Up @@ -288,15 +342,33 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):
raise Exception("unrecognized replication operation: {}".format(c['kind']))


singer.write_message(record_message)
yield record_message
state = singer.write_bookmark(state,
target_stream['tap_stream_id'],
'lsn',
lsn)


def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn, message_format="1"):
payload = json.loads(msg.payload)
lsn = msg.data_start

streams_lookup = {s['tap_stream_id']: s for s in streams}

if message_format == "1":
records = consume_message_format_1(payload, conn_info, streams_lookup, state, time_extracted, lsn)
elif message_format == "2":
records = consume_message_format_2(payload, conn_info, streams_lookup, state, time_extracted, lsn)
else:
raise Exception("Unknown wal2json message format version: {}".format(message_format))

for record_message in records:
if record_message:
singer.write_message(record_message)
# Pulled out of refactor so we send a keep-alive per-record
LOGGER.debug("sending feedback to server with NO flush_lsn. just a keep-alive")
msg.cursor.send_feedback()


LOGGER.debug("sending feedback to server. flush_lsn = %s", msg.data_start)
if msg.data_start > end_lsn:
raise Exception("incorrectly attempting to flush an lsn({}) > end_lsn({})".format(msg.data_start, end_lsn))
Expand Down Expand Up @@ -339,8 +411,17 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
with post_db.open_connection(conn_info, True) as conn:
with conn.cursor() as cur:
LOGGER.info("Starting Logical Replication for %s(%s): %s -> %s. poll_total_seconds: %s", list(map(lambda s: s['tap_stream_id'], logical_streams)), slot, start_lsn, end_lsn, poll_total_seconds)

replication_params = {"slot_name": slot,
"decode": True,
"start_lsn": start_lsn}
message_format = conn_info.get("wal2json_message_format") or "1"
if message_format == "2":
LOGGER.info("Using wal2json format-version 2")
replication_params["options"] = {"format-version": 2, "include-timestamp": True}

try:
cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn)
cur.start_replication(**replication_params)
except psycopg2.ProgrammingError:
raise Exception("unable to start replication with logical replication slot {}".format(slot))

Expand All @@ -358,13 +439,13 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
LOGGER.info("gone past end_lsn %s for run. breaking", end_lsn)
break

state = consume_message(logical_streams, state, msg, time_extracted, conn_info, end_lsn)
state = consume_message(logical_streams, state, msg, time_extracted,
conn_info, end_lsn, message_format=message_format)
#msg has been consumed. it has been processed
last_lsn_processed = msg.data_start
rows_saved = rows_saved + 1
if rows_saved % UPDATE_BOOKMARK_PERIOD == 0:
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))

else:
now = datetime.datetime.now()
timeout = keep_alive_time - (now - cur.io_timestamp).total_seconds()
Expand Down
24 changes: 24 additions & 0 deletions tests/db_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import psycopg2

def ensure_db(dbname=os.getenv('TAP_POSTGRES_DBNAME')):
# Create database dev if not exists
with get_test_connection() as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM pg_database WHERE datname = '{}'".format(dbname))
exists = cur.fetchone()
if not exists:
print("Creating database {}".format(dbname))
cur.execute("CREATE DATABASE {}".format(dbname))

def get_test_connection(dbname=os.getenv('TAP_POSTGRES_DBNAME'), logical_replication=False):
conn_string = "host='{}' dbname='{}' user='{}' password='{}' port='{}'".format(os.getenv('TAP_POSTGRES_HOST'),
dbname,
os.getenv('TAP_POSTGRES_USER'),
os.getenv('TAP_POSTGRES_PASSWORD'),
os.getenv('TAP_POSTGRES_PORT'))
if logical_replication:
return psycopg2.connect(conn_string, connection_factory=psycopg2.extras.LogicalReplicationConnection)
else:
return psycopg2.connect(conn_string)
Loading

0 comments on commit f8cd6b5

Please sign in to comment.