Skip to content
This repository was archived by the owner on Dec 22, 2020. It is now read-only.

Support more advanced schemas, including foreign keys #131

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f92a538
Small type updates and support for schemas
awreece Nov 7, 2017
31b82d0
Support nested objects with foreign keys
awreece Nov 7, 2017
b001d8c
Stricter error checking
awreece Nov 13, 2017
8f7c22f
Better support for alternative primary keys
awreece Nov 13, 2017
677175c
no active record
awreece Nov 14, 2017
3433099
fix timestamp
joshbeam Nov 14, 2017
68d67dd
add run script for migration
joshbeam Nov 14, 2017
38d5d4e
fix run script
joshbeam Nov 14, 2017
18f0433
do alex’s run script
joshbeam Nov 14, 2017
1ee7f02
looser type checking
awreece Nov 14, 2017
0919a8b
parallel import
awreece Nov 14, 2017
2d7950d
fix sanity check
joshbeam Nov 14, 2017
9e10bfb
allow subtables of composite key tables
awreece Nov 14, 2017
0b00ea0
properly find nested names
awreece Nov 15, 2017
d1d25c0
parent keys come before child keys in tables
awreece Nov 16, 2017
30f1bdf
case sensitive
awreece Nov 18, 2017
ac49560
limit max parallelism
awreece Nov 18, 2017
537910a
bigger pool
awreece Nov 22, 2017
98e282f
print using original
awreece Nov 27, 2017
7426323
unconditionally save tail state on initial import
awreece Dec 1, 2017
1818ecf
fix primary key handling for sub tables
awreece Dec 1, 2017
5aacf66
:name to be a to_sym
awreece Dec 1, 2017
322f876
always store types as syms
awreece Dec 2, 2017
9e97eaa
cant log from a signal handler
awreece Dec 4, 2017
dc6684e
puts in signal handler is ok
awreece Dec 8, 2017
a4aa002
types can be numerics
awreece Dec 9, 2017
db5c8b1
better handle delete op
awreece Dec 17, 2017
55a0ba2
more likely to quit
awreece Dec 18, 2017
04eaa69
bail out early if we fail to import a collection
awreece Dec 26, 2017
c2bb55c
better error message
awreece Dec 29, 2017
55b820d
dont bail out early
awreece Dec 29, 2017
6ec11c9
ignore vendor
awreece Dec 29, 2017
0f19c62
next instead of break
awreece Dec 29, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
collections.yml
/.bundle/
Gemfile.lock
vendor
7 changes: 7 additions & 0 deletions bin/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

scriptdir=$(python -c "import os; print(os.path.realpath('$(dirname $0)'))")
rootdir=$scriptdir/../

export RUBYLIB=$rootdir/lib:$RUBYLIB
exec $rootdir/bin/mosql "$@"
4 changes: 2 additions & 2 deletions lib/mosql/cli.rb
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ def initialize(args)
def setup_signal_handlers
%w[TERM INT USR2].each do |sig|
Signal.trap(sig) do
log.info("Got SIG#{sig}. Preparing to exit...")
puts("Got SIG#{sig}. Preparing to exit...")
@streamer.stop
end
end
@@ -121,7 +121,7 @@ def parse_args
end

def connect_mongo
@mongo = Mongo::MongoClient.from_uri(options[:mongo])
@mongo = Mongo::MongoClient.from_uri(options[:mongo], :pool_size => 8)
config = @mongo['admin'].command(:ismaster => 1)
if !config['setName'] && !options[:skip_tail]
log.warn("`#{options[:mongo]}' is not a replset.")
373 changes: 255 additions & 118 deletions lib/mosql/schema.rb

Large diffs are not rendered by default.

35 changes: 9 additions & 26 deletions lib/mosql/sql.rb
Original file line number Diff line number Diff line change
@@ -22,41 +22,24 @@ def connect_db(uri, pgschema)
end)
end

def table_for_ns(ns)
@db[@schema.table_for_ns(ns).intern]
end

def transform_one_ns(ns, obj)
h = {}
cols = @schema.all_columns(@schema.find_ns(ns))
row = @schema.transform(ns, obj)
cols.zip(row).each { |k,v| h[k] = v }
h
def table_for_ident(ident)
@db[ident]
end

def upsert_ns(ns, obj)
h = transform_one_ns(ns, obj)
upsert!(table_for_ns(ns), @schema.primary_sql_key_for_ns(ns), h)
@schema.all_transforms_for_ns(ns, [obj]) do |table, pks, row|
upsert!(table_for_ident(table), pks, row)
end
end

def delete_ns(ns, obj)
primary_sql_keys = @schema.primary_sql_key_for_ns(ns)
h = transform_one_ns(ns, obj)
query = {}
primary_sql_keys.each do |key|
raise "No #{primary_sql_keys} found in transform of #{obj.inspect}" if h[key].nil?
query[key.to_sym] = h[key]
end

table_for_ns(ns).where(query).delete
table = table_for_ident(@schema.primary_table_name_for_ns(ns))
pks = @schema.primary_sql_keys_for_ns_obj(ns, obj)
table.where(pks).delete
end

def upsert!(table, table_primary_keys, item)
query = {}
table_primary_keys.each do |key|
query[key.to_sym] = item[key]
end
rows = table.where(query).update(item)
rows = table.where(table_primary_keys).update(item)
if rows == 0
begin
table.insert(item)
97 changes: 48 additions & 49 deletions lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'parallel'

module MoSQL
class Streamer
include MoSQL::Logging
@@ -49,25 +51,14 @@ def unsafe_handle_exceptions(ns, obj)
end

def bulk_upsert(table, ns, items)
begin
@schema.copy_data(table.db, ns, items)
rescue Sequel::DatabaseError => e
log.debug("Bulk insert error (#{e}), attempting invidual upserts...")
cols = @schema.all_columns(@schema.find_ns(ns))
items.each do |it|
h = {}
cols.zip(it).each { |k,v| h[k] = v }
unsafe_handle_exceptions(ns, h) do
@sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h)
end
end
end
table.multi_insert(items)
end

def with_retries(tries=10)
tries.times do |try|
begin
yield
break
rescue Mongo::ConnectionError, Mongo::ConnectionFailure, Mongo::OperationFailure => e
# Duplicate key error
raise if e.kind_of?(Mongo::OperationFailure) && [11000, 11001].include?(e.error_code)
@@ -89,12 +80,10 @@ def track_time
def initial_import
@schema.create_schema(@sql.db, !options[:no_drop_tables])

unless options[:skip_tail]
start_state = {
'time' => nil,
'position' => @tailer.most_recent_position
}
end
start_state = {
'time' => nil,
'position' => @tailer.most_recent_position
}

dbnames = []

@@ -117,52 +106,71 @@ def initial_import
db = @mongo.db(dbname)
collections = db.collections.select { |c| spec.key?(c.name) }

collections.each do |collection|
Parallel.each(collections, in_threads: 4) do |collection|
ns = "#{dbname}.#{collection.name}"
import_collection(ns, collection, spec[collection.name][:meta][:filter])
begin
import_collection(ns, collection, spec[collection.name][:meta][:filter])
rescue Exception => ex
log.error("Error importing collection #{ns} - #{ex.message}:\n#{ex.backtrace.join("\n")}")
end
exit(0) if @done
end
end

tailer.save_state(start_state) unless options[:skip_tail]
tailer.save_state(start_state)
end

def did_truncate; @did_truncate ||= {}; end

def upsert_all_batches(batches, ns)
# We use all_table_names_for_ns so we can ensure we write the parent table
# before we write the child.
sql_time = 0
@schema.all_table_names_for_ns(ns).map do |table_name|
unless batches[table_name].empty?
sql_time += track_time do
bulk_upsert(@sql.table_for_ident(table_name), ns,
batches[table_name])
batches[table_name].clear
end
end
end
sql_time
end

def import_collection(ns, collection, filter)
log.info("Importing for #{ns}...")
count = 0
batch = []
table = @sql.table_for_ns(ns)
batches = Hash[@schema.all_table_names_for_ns(ns).map { |n| [n, []] }]
table = @sql.table_for_ident(@schema.primary_table_name_for_ns(ns))
unless options[:no_drop_tables] || did_truncate[table.first_source]
table.truncate
table.truncate :cascade => true
did_truncate[table.first_source] = true
end

start = Time.now
sql_time = 0
collection.find(filter, :batch_size => BATCH) do |cursor|
with_retries do
cursor.each do |obj|
batch << @schema.transform(ns, obj)
@schema.all_transforms_for_ns(ns, cursor) do |ident, _, row|
table = @sql.table_for_ident(ident)
count += 1
batches[ident] << row

if batch.length >= BATCH
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
if count % BATCH == 0
sql_time += upsert_all_batches(batches, ns)
elapsed = Time.now - start
log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
batch.clear
exit(0) if @done
log.info("Imported #{count} rows into #{ns} (#{elapsed}s, #{sql_time}s SQL)...")
end
exit(0) if @done
end
end
end

unless batch.empty?
bulk_upsert(table, ns, batch)
end
sql_time += upsert_all_batches(batches, ns)

elapsed = Time.now - start
log.info("Finished import of #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
end

def optail
@@ -172,10 +180,11 @@ def optail
end
tailer.tail(:from => tail_from, :filter => options[:oplog_filter])
until @done
tailer.stream(1000) do |op|
tailer.stream(5) do |op|
handle_op(op)
end
end
tailer.save_state
end

def sync_object(ns, selector)
@@ -231,22 +240,12 @@ def handle_op(op)
log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})")
sync_object(ns, selector)
else

# The update operation replaces the existing object, but
# preserves its _id field, so grab the _id off of the
# 'query' field -- it's not guaranteed to be present on the
# update.
primary_sql_keys = @schema.primary_sql_key_for_ns(ns)
schema = @schema.find_ns!(ns)
keys = {}
primary_sql_keys.each do |key|
source = schema[:columns].find {|c| c[:name] == key }[:source]
keys[source] = selector[source]
end

log.debug("upsert #{ns}: #{keys}")
update = @schema.save_all_pks_for_ns(ns, update, selector)

update = keys.merge(update)
unsafe_handle_exceptions(ns, update) do
@sql.upsert_ns(ns, update)
end
2 changes: 1 addition & 1 deletion lib/mosql/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module MoSQL
VERSION = "0.4.3"
VERSION = "0.4.5"
end
1 change: 1 addition & 0 deletions mosql.gemspec
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "rake"
gem.add_runtime_dependency "log4r"
gem.add_runtime_dependency "json"
gem.add_runtime_dependency "parallel"

gem.add_runtime_dependency "mongoriver", "0.4"

4 changes: 2 additions & 2 deletions test/unit/lib/mosql/schema.rb
Original file line number Diff line number Diff line change
@@ -104,8 +104,8 @@ class MoSQL::Test::SchemaTest < MoSQL::Test
end

it 'Can find the primary key of the SQL table' do
assert_equal(['id'], @map.primary_sql_key_for_ns('db.collection'))
assert_equal(['_id'], @map.primary_sql_key_for_ns('db.old_conf_syntax'))
assert_equal(['id'], @map.primary_sql_keys_for_ns('db.collection'))
assert_equal(['_id'], @map.primary_sql_keys_for_ns('db.old_conf_syntax'))
end

it 'can create a SQL schema' do