Skip to content

Commit cae382c

Browse files
committed
Initial commit
0 parents  commit cae382c

18 files changed

+1080
-0
lines changed

.gitignore

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Vim Swap files
2+
.*.sw[a-z]
3+
4+
# Snapshots, WAL files, logs, pid files
5+
*.snap
6+
*.xlog
7+
*.log
8+
*.pid
9+
10+
# uuid files that is used to bootstrap vshard cluster
11+
chunked_example_fast/*.uuid

README.md

+293
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
## Overview
2+
3+
The basic information on the merger API is provided in the commit message where
4+
the merger was introduced. Please refer it first. This document and other
5+
repository content expands the API description with usage examples.
6+
7+
TBD: paste a link.
8+
9+
## API and basic usage
10+
11+
The basic case of using merger is when there are M storages and data are
12+
partitioned (sharded) across them. A client want to fetch a tuple stream from
13+
each storage and merge them into one tuple stream:
14+
15+
```lua
16+
local msgpack = require('msgpack')
17+
local net_box = require('net.box')
18+
local buffer = require('buffer')
19+
local key_def = require('key_def')
20+
local merger = require('merger')
21+
22+
-- Prepare M connections.
23+
local net_box_opts = {reconnect_after = 0.1}
24+
local connects = {
25+
net_box.connect('localhost:3301', net_box_opts),
26+
net_box.connect('localhost:3302', net_box_opts),
27+
...
28+
net_box.connect('localhost:<...>', net_box_opts),
29+
}
30+
31+
-- Set key parts from an index.
32+
-- See the 'How to form key parts' section below.
33+
local key_parts = {}
34+
local space = connects[1].space.<...>
35+
local index = space.index.<...>
36+
local key_def_inst = key_def.new(index.parts)
37+
if not index.unique then
38+
key_def_inst = key_def_inst:merge(key_def.new(space.index[0].parts))
39+
end
40+
41+
-- Create a merger context.
42+
-- NB: It worth to cache it.
43+
local ctx = merger.context.new(key_def_inst)
44+
45+
-- Prepare M sources.
46+
local sources = {}
47+
for _, conn in ipairs(connects) do
48+
local buf = buffer.ibuf()
49+
conn.space.<...>.index.<...>:select(<...>, {buffer = buf,
50+
skip_header = true})
51+
table.insert(sources, merger.new_source_frombuffer(buf))
52+
end
53+
54+
-- Merge.
55+
local merger_inst = merger.new(ctx, sources)
56+
local res = merger_inst:select()
57+
```
58+
59+
## How to form key parts
60+
61+
The merger expects that each input tuple stream is sorted in the order that
62+
acquired for a result (via key parts and the `reverse` flag). It performs a
63+
kind of the merge sort: chooses a source with a minimal / maximal tuple on each
64+
step, consumes a tuple from this source and repeats.
65+
66+
A :select() or a :pairs() from a space gives tuples in the order that
67+
corresponds to an index. Key parts from this index should be used to perform
68+
merge of such selects.
69+
70+
A secondary non-unique index sort tuples that are equal by parts of the index
71+
according to a primary index order (we can imagine that as if a non-unique
72+
index would have hidden key parts copied from a primary index).
73+
74+
So when one perform a merge of select results received via a non-unique index a
75+
primary index key parts should be added after a non-unique index key parts. The
76+
example above shows this approach with using `key_def_inst:merge()` method.
77+
78+
## Preparing buffers
79+
80+
### In short
81+
82+
* Use `skip_header = true` option for a `:select()` net.box request.
83+
* In addition use `msgpack.decode_array()` function to postprocess a
84+
net.box :call() result.
85+
86+
See the example in the 'API and basic usage' section for the former
87+
bullet and the example in the 'Multiplexing requests' section for the
88+
latter one.
89+
90+
### In details
91+
92+
We'll use the symbol T below to represent an msgpack array that
93+
corresponds to a tuple.
94+
95+
A select response has the following structure: `{[48] = {T, T, ...}}`,
96+
while a call response is `{[48] = {{T, T, ...}}}` (because it should
97+
support multiple return values). A user should skip extra headers and
98+
pass a buffer with the read position on `{T, T, ...}` to a merger.
99+
100+
Note: `{[48] = ...}` wrapper is referred below as iproto_data header.
101+
102+
Typical headers are the following:
103+
104+
Cases | Buffer structure
105+
---------------- | ----------------
106+
raw data | {T, T, ...}
107+
net.box select | {[48] = {T, T, ...}}
108+
net.box call | {[48] = {{T, T, ...}}}
109+
110+
See also the following docbot requests:
111+
112+
* Non-recursive msgpack decoding functions.
113+
* net.box: skip_header option.
114+
115+
XXX: add links.
116+
117+
How to check buffer data structure myself:
118+
119+
```lua
120+
local net_box = require('net.box')
121+
local buffer = require('buffer')
122+
local ffi = require('ffi')
123+
local msgpack = require('msgpack')
124+
local yaml = require('yaml')
125+
126+
box.cfg{listen = 3301}
127+
box.once('load_data', function()
128+
box.schema.user.grant('guest', 'read,write,execute', 'universe')
129+
box.schema.space.create('s')
130+
box.space.s:create_index('pk')
131+
box.space.s:insert({1})
132+
box.space.s:insert({2})
133+
box.space.s:insert({3})
134+
box.space.s:insert({4})
135+
end)
136+
137+
local function foo()
138+
return box.space.s:select()
139+
end
140+
_G.foo = foo
141+
142+
local conn = net_box.connect('localhost:3301')
143+
144+
local buf = buffer.ibuf()
145+
conn.space.s:select(nil, {buffer = buf})
146+
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
147+
local buf_lua = msgpack.decode(buf_str)
148+
print('select:\n' .. yaml.encode(buf_lua))
149+
150+
local buf = buffer.ibuf()
151+
conn:call('foo', nil, {buffer = buf})
152+
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
153+
local buf_lua = msgpack.decode(buf_str)
154+
print('call:\n' .. yaml.encode(buf_lua))
155+
156+
os.exit()
157+
```
158+
159+
## Chunked data transfer
160+
161+
The merger can ask for further data from a drained source when one of the
162+
following functions are used to create a source:
163+
164+
* merger.new_buffer_source(gen, param, state)
165+
* merger.new_table_source(gen, param, state)
166+
* merger.new_tuple_source(gen, param, state)
167+
168+
A `gen` function should return the following values correspondingly:
169+
170+
* <state>, <buffer> or <nil>
171+
* <state>, <table> or <nil>
172+
* <state>, <tuple> or <nil>
173+
174+
Note: The merger understands both tuples and Lua tables ({...} and
175+
box.tuple.new({...})) as input tuples in a table and a tuple source, but we
176+
refer them just as tuples for simplicity.
177+
178+
Each of returned buffer or table represents a chunk of data. In case of tuple
179+
source a chunk always consists of one tuple. When there are no more chunks a
180+
`gen` function should return `nil`.
181+
182+
The following example fetches a data from two storages in chunks. A first
183+
request uses ALL iterator and BLOCK_SIZE limit, the following ones use the same
184+
limit and GT iterator (with a key extracted from a last fetched tuple).
185+
186+
Note: such way to implement a cursor / a pagination will work smoothly only
187+
with unique indexes. See also #3898.
188+
189+
More complex scenarious are possible: using futures (`is_async = true`
190+
parameters of net.box methods) to fetch a next chunk while merge a current one
191+
or, say, call a function with several return values (some of them need to be
192+
skipped manually in a `gen` function to let merger read tuples).
193+
194+
Note: When using `is_async = true` net.box option one can lean on the fact that
195+
net.box writes an answer w/o yield: a partial result cannot be observed.
196+
197+
```lua
198+
-- Storage script
199+
-- --------------
200+
201+
-- See chunked_example/storage.lua.
202+
203+
-- Client script
204+
-- -------------
205+
206+
-- See chunked_example/frontend.lua.
207+
```
208+
209+
Let show which optimization can be applied here:
210+
211+
* Using buffer sources to avoid unpacking a tuple data recursively into Lua
212+
objects (that can lead to much extra LuaJIT GC work).
213+
* Fire a next request asynchronously once we receive the previous one, but
214+
before we'll process it. So we'll fetch data in background while performing a
215+
merge.
216+
217+
These optimizations let us introduce a stored procedure on storages that will
218+
return a cursor and data. On a client we'll wait for Nth request, decode only
219+
cursor from it, asynchronously fire (N+1)th request and return Nth data to a
220+
merger.
221+
222+
The example below provides simple cursor implementation (only for unique
223+
indexes) and use vshard API on a client.
224+
225+
```lua
226+
-- Storage script
227+
-- --------------
228+
229+
-- See chunked_example_fast/storage.lua.
230+
231+
-- Client script
232+
-- -------------
233+
234+
-- See chunked_example_fast/frontend.lua.
235+
```
236+
237+
## Multiplexing requests
238+
239+
Consider the case when a network latency between storage machines and frontend
240+
machine(s) is much larger then a time to process a request on a frontend. This
241+
situation is typical when a workload consists of many small requests.
242+
243+
So it can be worth to 'multiplex' different requests to storage machines within
244+
one network request. We'll consider approach when a storage function returns
245+
many box.space.<...>:select(<...>) results instead of one.
246+
247+
One need to skip iproto_data header, two array headers and then run a merger N
248+
times on the same buffers (with the same or different contexts). No extra data
249+
copies, no tuples decoding into a Lua memory.
250+
251+
```lua
252+
-- Storage script
253+
-- --------------
254+
255+
-- See multiplexed_example/storage.lua.
256+
257+
-- Client script
258+
-- -------------
259+
260+
-- See multiplexed_example/frontend.lua.
261+
```
262+
263+
## Cascading mergers
264+
265+
The idea is simple: a merger instance itself is a merger source.
266+
267+
The example below is synthetic to be simple. Real cases when cascading can be
268+
profitable likely involve additional layers of Tarantool instances between a
269+
storage and a client or separate threads to merge blocks of each level.
270+
271+
To be honest no one use this ability for now. It exists, because the same
272+
behaviour for a source and a merger looks as the good property of the API.
273+
274+
```lua
275+
<...requires...>
276+
277+
local sources = <...100 sources...>
278+
local ctx = merger.context.new(key_def.new(<...>))
279+
280+
-- Create 10 mergers with 10 sources in each.
281+
local middleware_mergers = {}
282+
for i = 1, 10 do
283+
local current_sources = {}
284+
for j = 1, 10 do
285+
current_sources[j] = sources[(i - 1) * 10 + j]
286+
end
287+
middleware_mergers[i] = merger.new(ctx, current_sources)
288+
end
289+
290+
-- Note: Using different contexts will lead to extra copying of
291+
-- tuples.
292+
local res = merger.new(ctx, middleware_mergers):select()
293+
```

chunked_example/Makefile

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
default: test
2+
3+
.PHONY: start
4+
start:
5+
mkdir -p storage_1
6+
mkdir -p storage_2
7+
./storage_1.lua
8+
./storage_2.lua
9+
10+
.PHONY: stop
11+
stop:
12+
test -f storage_1.pid && PID=$$(cat storage_1.pid) && kill $$PID && \
13+
while kill -0 $$PID 2>/dev/null; do sleep 0.1; done || true
14+
test -f storage_2.pid && PID=$$(cat storage_2.pid) && kill $$PID && \
15+
while kill -0 $$PID 2>/dev/null; do sleep 0.1; done || true
16+
17+
.PHONY: test
18+
test:
19+
$(MAKE) start
20+
./frontend.lua
21+
$(MAKE) stop
22+
23+
.PHONY: clean
24+
clean:
25+
rm storage_[12]/*.{xlog,snap} || true
26+
rmdir storage_[12] || true
27+
rm storage_[12].{log,pid} || true

chunked_example/frontend.lua

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env tarantool
2+
3+
local buffer = require('buffer')
4+
local net_box = require('net.box')
5+
local key_def = require('key_def')
6+
local merger = require('merger')
7+
local yaml = require('yaml')
8+
9+
local BLOCK_SIZE = 2
10+
11+
local function fetch_chunk(param, state)
12+
local conn = param.conn
13+
local key_def_inst = param.key_def
14+
local last_tuple = state.last_tuple
15+
16+
local opts = {limit = BLOCK_SIZE}
17+
local res
18+
19+
if state.last_tuple == nil then
20+
-- A first request: ALL iterator + limit.
21+
res = conn.space.s:select(nil, opts)
22+
else
23+
-- Subsequent requests: GT iterator + limit.
24+
local key = key_def_inst:extract_key(last_tuple)
25+
opts.iterator = box.index.GT
26+
res = conn.space.s:select(key, opts)
27+
end
28+
29+
if #res == 0 then return nil end
30+
31+
local new_state = {conn = conn, last_tuple = res[#res]}
32+
return new_state, res
33+
end
34+
35+
local conns = {
36+
net_box.connect('localhost:3301', {reconnect_after = 0.1}),
37+
net_box.connect('localhost:3302', {reconnect_after = 0.1}),
38+
}
39+
40+
local key_parts = conns[1].space.s.index.pk.parts
41+
local key_def_inst = key_def.new(key_parts)
42+
local ctx = merger.context.new(key_def_inst)
43+
local sources = {}
44+
for i, conn in ipairs(conns) do
45+
local param = {conn = conns[i], key_def = key_def_inst}
46+
sources[i] = merger.new_table_source(fetch_chunk, param, {})
47+
end
48+
local merger_inst = merger.new(ctx, sources)
49+
local res = merger_inst:select()
50+
print(yaml.encode(res))
51+
os.exit()

0 commit comments

Comments
 (0)