Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit f33e6ea

Browse files
authored
Merge pull request #605 from erizocosmico/feature/exp-inmemoryjoins
add experimental optional feature to use in memory joins
2 parents 528cdba + 2ab5a2f commit f33e6ea

File tree

2 files changed

+163
-6
lines changed

2 files changed

+163
-6
lines changed

sql/plan/innerjoin.go

Lines changed: 111 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,18 @@ package plan
22

33
import (
44
"io"
5+
"os"
56
"reflect"
67

78
opentracing "github.com/opentracing/opentracing-go"
89
"gopkg.in/src-d/go-mysql-server.v0/sql"
910
)
1011

12+
const experimentalInMemoryJoinKey = "EXPERIMENTAL_IN_MEMORY_JOIN"
13+
const inMemoryJoinSessionVar = "inmemory_joins"
14+
15+
var useInMemoryJoins = os.Getenv(experimentalInMemoryJoinKey) != ""
16+
1117
// InnerJoin is an inner join between two tables.
1218
type InnerJoin struct {
1319
BinaryNode
@@ -61,12 +67,36 @@ func (j *InnerJoin) RowIter(ctx *sql.Context) (sql.RowIter, error) {
6167
return nil, err
6268
}
6369

64-
return sql.NewSpanIter(span, &innerJoinIter{
65-
l: l,
66-
rp: j.Right,
67-
ctx: ctx,
68-
cond: j.Cond,
69-
}), nil
70+
var inMemorySession bool
71+
_, val := ctx.Get(inMemoryJoinSessionVar)
72+
if val != nil {
73+
inMemorySession = true
74+
}
75+
76+
var iter sql.RowIter
77+
if useInMemoryJoins || inMemorySession {
78+
r, err := j.Right.RowIter(ctx)
79+
if err != nil {
80+
span.Finish()
81+
return nil, err
82+
}
83+
84+
iter = &innerJoinMemoryIter{
85+
l: l,
86+
r: r,
87+
ctx: ctx,
88+
cond: j.Cond,
89+
}
90+
} else {
91+
iter = &innerJoinIter{
92+
l: l,
93+
rp: j.Right,
94+
ctx: ctx,
95+
cond: j.Cond,
96+
}
97+
}
98+
99+
return sql.NewSpanIter(span, iter), nil
70100
}
71101

72102
// TransformUp implements the Transformable interface.
@@ -196,3 +226,78 @@ func (i *innerJoinIter) Close() error {
196226

197227
return nil
198228
}
229+
230+
type innerJoinMemoryIter struct {
231+
l sql.RowIter
232+
r sql.RowIter
233+
ctx *sql.Context
234+
cond sql.Expression
235+
pos int
236+
leftRow sql.Row
237+
right []sql.Row
238+
}
239+
240+
func (i *innerJoinMemoryIter) Next() (sql.Row, error) {
241+
for {
242+
if i.leftRow == nil {
243+
r, err := i.l.Next()
244+
if err != nil {
245+
return nil, err
246+
}
247+
248+
i.leftRow = r
249+
}
250+
251+
if i.r != nil {
252+
for {
253+
row, err := i.r.Next()
254+
if err != nil {
255+
if err == io.EOF {
256+
break
257+
}
258+
return nil, err
259+
}
260+
261+
i.right = append(i.right, row)
262+
}
263+
i.r = nil
264+
}
265+
266+
if i.pos >= len(i.right) {
267+
i.pos = 0
268+
i.leftRow = nil
269+
continue
270+
}
271+
272+
rightRow := i.right[i.pos]
273+
var row = make(sql.Row, len(i.leftRow)+len(rightRow))
274+
copy(row, i.leftRow)
275+
copy(row[len(i.leftRow):], rightRow)
276+
277+
i.pos++
278+
279+
v, err := i.cond.Eval(i.ctx, row)
280+
if err != nil {
281+
return nil, err
282+
}
283+
284+
if v == true {
285+
return row, nil
286+
}
287+
}
288+
}
289+
290+
func (i *innerJoinMemoryIter) Close() error {
291+
if err := i.l.Close(); err != nil {
292+
if i.r != nil {
293+
_ = i.r.Close()
294+
}
295+
return err
296+
}
297+
298+
if i.r != nil {
299+
return i.r.Close()
300+
}
301+
302+
return nil
303+
}

sql/plan/innerjoin_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,41 @@ func TestInnerJoin(t *testing.T) {
3838
}, rows)
3939
}
4040

41+
func TestInMemoryInnerJoin(t *testing.T) {
42+
require := require.New(t)
43+
finalSchema := append(lSchema, rSchema...)
44+
45+
ltable := mem.NewTable("left", lSchema)
46+
rtable := mem.NewTable("right", rSchema)
47+
insertData(t, ltable)
48+
insertData(t, rtable)
49+
50+
j := NewInnerJoin(
51+
NewResolvedTable(ltable),
52+
NewResolvedTable(rtable),
53+
expression.NewEquals(
54+
expression.NewGetField(0, sql.Text, "lcol1", false),
55+
expression.NewGetField(4, sql.Text, "rcol1", false),
56+
))
57+
58+
require.Equal(finalSchema, j.Schema())
59+
60+
ctx := sql.NewEmptyContext()
61+
ctx.Set(inMemoryJoinSessionVar, sql.Text, "true")
62+
63+
iter, err := j.RowIter(ctx)
64+
require.NoError(err)
65+
66+
rows, err := sql.RowIterToRows(iter)
67+
require.NoError(err)
68+
require.Len(rows, 2)
69+
70+
require.Equal([]sql.Row{
71+
{"col1_1", "col2_1", int32(1111), int64(2222), "col1_1", "col2_1", int32(1111), int64(2222)},
72+
{"col1_2", "col2_2", int32(3333), int64(4444), "col1_2", "col2_2", int32(3333), int64(4444)},
73+
}, rows)
74+
}
75+
4176
func TestInnerJoinEmpty(t *testing.T) {
4277
require := require.New(t)
4378
ctx := sql.NewEmptyContext()
@@ -118,6 +153,23 @@ func BenchmarkInnerJoin(b *testing.B) {
118153
}
119154
})
120155

156+
b.Run("in memory inner join", func(b *testing.B) {
157+
useInMemoryJoins = true
158+
require := require.New(b)
159+
160+
for i := 0; i < b.N; i++ {
161+
iter, err := n1.RowIter(ctx)
162+
require.NoError(err)
163+
164+
rows, err := sql.RowIterToRows(iter)
165+
require.NoError(err)
166+
167+
require.Equal(expected, rows)
168+
}
169+
170+
useInMemoryJoins = false
171+
})
172+
121173
b.Run("cross join with filter", func(b *testing.B) {
122174
require := require.New(b)
123175

0 commit comments

Comments
 (0)