Skip to content

Commit

Permalink
Transparently receive parameters in binary format when possible
Browse files Browse the repository at this point in the history
When preparing a statement for repeated execution (as opposed to just
parameterizing a single query using the unnamed statement) we get to
know the types of the resulting columns before we have to decide which
ones to receive in binary and which ones in text.  We can use that to
our advantage to transparently avoid unnecessary binary -> text ->
binary conversions.

This has been shown in some cases to provide massive performance
benefits, with little to no penalty even in the pathological case.  But
just to err on the safe side, an option for disabling this feature is
provided, disable_prepared_binary_result.  It is not documented in the
user-facing documentation since its use is expected to be practically
nonexistent.

In the current state of affairs, only bytea and int8/int4/int2 values are
requested in binary from the server.  Floats and time-related types are
probably the next types to get the same treatment.

Chris Bandy and Marko Tiikkaja
  • Loading branch information
johto committed May 30, 2015
1 parent 54fc392 commit c4afb3f
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 44 deletions.
10 changes: 5 additions & 5 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,23 +325,23 @@ var testIntBytes = []byte("1234")

func BenchmarkDecodeInt64(b *testing.B) {
for i := 0; i < b.N; i++ {
decode(&parameterStatus{}, testIntBytes, oid.T_int8)
decode(&parameterStatus{}, testIntBytes, oid.T_int8, formatText)
}
}

var testFloatBytes = []byte("3.14159")

func BenchmarkDecodeFloat64(b *testing.B) {
for i := 0; i < b.N; i++ {
decode(&parameterStatus{}, testFloatBytes, oid.T_float8)
decode(&parameterStatus{}, testFloatBytes, oid.T_float8, formatText)
}
}

var testBoolBytes = []byte{'t'}

func BenchmarkDecodeBool(b *testing.B) {
for i := 0; i < b.N; i++ {
decode(&parameterStatus{}, testBoolBytes, oid.T_bool)
decode(&parameterStatus{}, testBoolBytes, oid.T_bool, formatText)
}
}

Expand All @@ -358,7 +358,7 @@ var testTimestamptzBytes = []byte("2013-09-17 22:15:32.360754-07")

func BenchmarkDecodeTimestamptz(b *testing.B) {
for i := 0; i < b.N; i++ {
decode(&parameterStatus{}, testTimestamptzBytes, oid.T_timestamptz)
decode(&parameterStatus{}, testTimestamptzBytes, oid.T_timestamptz, formatText)
}
}

Expand All @@ -371,7 +371,7 @@ func BenchmarkDecodeTimestamptzMultiThread(b *testing.B) {
f := func(wg *sync.WaitGroup, loops int) {
defer wg.Done()
for i := 0; i < loops; i++ {
decode(&parameterStatus{}, testTimestamptzBytes, oid.T_timestamptz)
decode(&parameterStatus{}, testTimestamptzBytes, oid.T_timestamptz, formatText)
}
}

Expand Down
168 changes: 140 additions & 28 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,33 @@ type conn struct {
// If true, this connection is bad and all public-facing functions should
// return ErrBadConn.
bad bool

// If set, this connection should never use the binary format when
// receiving query results from prepared statements. Only provided for
// debugging.
disablePreparedBinaryResult bool
}

// Handle driver-side settings in parsed connection string.
func (c *conn) handleDriverSettings(o values) (err error) {
boolSetting := func(key string, val *bool) error {
if value := o.Get(key); value != "" {
if value == "yes" {
*val = true
} else if value == "no" {
*val = false
} else {
return fmt.Errorf("unrecognized value %q for disable_prepared_binary_result", value)
}
}
return nil
}

err = boolSetting("disable_prepared_binary_result", &c.disablePreparedBinaryResult)
if err != nil {
return err
}
return nil
}

func (c *conn) writeBuf(b byte) *writeBuf {
Expand Down Expand Up @@ -194,12 +221,16 @@ func DialOpen(d Dialer, name string) (_ driver.Conn, err error) {
}
}

c, err := dial(d, o)
cn := &conn{}
err = cn.handleDriverSettings(o)
if err != nil {
return nil, err
}

cn := &conn{c: c}
cn.c, err = dial(d, o)
if err != nil {
return nil, err
}
cn.ssl(o)
cn.buf = bufio.NewReader(cn.c)
cn.startup(o)
Expand Down Expand Up @@ -496,7 +527,7 @@ func (cn *conn) simpleExec(q string) (res driver.Result, commandTag string, err
}
}

func (cn *conn) simpleQuery(q string) (res driver.Rows, err error) {
func (cn *conn) simpleQuery(q string) (res *rows, err error) {
defer cn.errRecover(&err)

st := &stmt{cn: cn, name: ""}
Expand All @@ -518,10 +549,11 @@ func (cn *conn) simpleQuery(q string) (res driver.Rows, err error) {
errorf("unexpected message %q in simple query execution", t)
}
res = &rows{
cn: cn,
cols: st.cols,
cn: cn,
cols: st.cols,
rowTyps: st.rowTyps,
done: true,
rowFmts: st.rowFmts,
done: true,
}
case 'Z':
cn.processReadyForQuery(r)
Expand All @@ -541,9 +573,8 @@ func (cn *conn) simpleQuery(q string) (res driver.Rows, err error) {
case 'T':
// res might be non-nil here if we received a previous
// CommandComplete, but that's fine; just overwrite it
rs := &rows{cn: cn}
rs.cols, rs.rowTyps = parseMeta(r)
res = rs
res = &rows{cn: cn}
res.cols, res.rowFmts, res.rowTyps = parseMeta(r)

// To work around a bug in QueryRow in Go 1.2 and earlier, wait
// until the first DataRow has been received.
Expand All @@ -554,6 +585,50 @@ func (cn *conn) simpleQuery(q string) (res driver.Rows, err error) {
}
}

// Decides which column formats to use for a prepared statement. The input is
// an array of type oids, one element per result column.
func decideColumnFormats(rowTyps []oid.Oid, forceText bool) (rowFmts []format, rowFmtData []byte) {
rowFmts = make([]format, len(rowTyps))
if forceText {
return rowFmts, rowFmtDataAllText
}

allBinary := true
allText := true
for i, o := range rowTyps {
switch o {
// This is the list of types to use binary mode for when receiving them
// through a prepared statement. If a type appears in this list, it
// must also be implemented in binaryDecode in encode.go.
case oid.T_bytea:
fallthrough
case oid.T_int8:
fallthrough
case oid.T_int4:
fallthrough
case oid.T_int2:
rowFmts[i] = formatBinary
allText = false

default:
allBinary = false
}
}

if allBinary {
return rowFmts, rowFmtDataAllBinary
} else if allText {
return rowFmts, rowFmtDataAllText
} else {
rowFmtData = make([]byte, 2+len(rowFmts)*2)
binary.BigEndian.PutUint16(rowFmtData, uint16(len(rowFmts)))
for i, v := range rowFmts {
binary.BigEndian.PutUint16(rowFmtData[2+i*2:], uint16(v))
}
return rowFmts, rowFmtData
}
}

func (cn *conn) prepareTo(q, stmtName string) (_ *stmt, err error) {
st := &stmt{cn: cn, name: stmtName}

Expand Down Expand Up @@ -581,9 +656,11 @@ func (cn *conn) prepareTo(q, stmtName string) (_ *stmt, err error) {
st.paramTyps[i] = r.oid()
}
case 'T':
st.cols, st.rowTyps = parseMeta(r)
st.cols, st.rowTyps = parseStatementRowDescribe(r)
st.rowFmts, st.rowFmtData = decideColumnFormats(st.rowTyps, cn.disablePreparedBinaryResult)
case 'n':
// no data
st.rowFmtData = rowFmtDataAllText
case 'Z':
cn.processReadyForQuery(r)
return st, err
Expand Down Expand Up @@ -644,9 +721,10 @@ func (cn *conn) Query(query string, args []driver.Value) (_ driver.Rows, err err

st.exec(args)
return &rows{
cn: cn,
cols: st.cols,
cn: cn,
cols: st.cols,
rowTyps: st.rowTyps,
rowFmts: st.rowFmts,
}, nil
}

Expand Down Expand Up @@ -971,6 +1049,8 @@ func isDriverSetting(key string) bool {
return true
case "connect_timeout":
return true
case "disable_prepared_binary_result":
return true

default:
return false
Expand Down Expand Up @@ -1053,13 +1133,26 @@ func (cn *conn) auth(r *readBuf, o values) {
}
}

type format int

const formatText format = 0
const formatBinary format = 1

// One result-column format code with the value 1 (i.e. all binary).
var rowFmtDataAllBinary []byte = []byte{0, 1, 0, 1}

// No result-column format codes (i.e. all text).
var rowFmtDataAllText []byte = []byte{0, 0}

type stmt struct {
cn *conn
name string
cols []string
rowTyps []oid.Oid
paramTyps []oid.Oid
closed bool
cn *conn
name string
cols []string
rowFmts []format
rowFmtData []byte
rowTyps []oid.Oid
paramTyps []oid.Oid
closed bool
}

func (st *stmt) Close() (err error) {
Expand Down Expand Up @@ -1103,9 +1196,10 @@ func (st *stmt) Query(v []driver.Value) (r driver.Rows, err error) {

st.exec(v)
return &rows{
cn: st.cn,
cols: st.cols,
cn: st.cn,
cols: st.cols,
rowTyps: st.rowTyps,
rowFmts: st.rowFmts,
}, nil
}

Expand Down Expand Up @@ -1159,7 +1253,7 @@ func (st *stmt) exec(v []driver.Value) {
w.bytes(b)
}
}
w.int16(0)
w.bytes(st.rowFmtData)

w.next('E')
w.byte(0)
Expand Down Expand Up @@ -1278,11 +1372,12 @@ func (cn *conn) parseComplete(commandTag string) (driver.Result, string) {
}

type rows struct {
cn *conn
cols []string
cn *conn
cols []string
rowTyps []oid.Oid
done bool
rb readBuf
rowFmts []format
done bool
rb readBuf
}

func (rs *rows) Close() error {
Expand Down Expand Up @@ -1339,7 +1434,7 @@ func (rs *rows) Next(dest []driver.Value) (err error) {
dest[i] = nil
continue
}
dest[i] = decode(&conn.parameterStatus, rs.rb.next(l), rs.rowTyps[i])
dest[i] = decode(&conn.parameterStatus, rs.rb.next(l), rs.rowTyps[i], rs.rowFmts[i])
}
return
default:
Expand Down Expand Up @@ -1401,15 +1496,32 @@ func (c *conn) processReadyForQuery(r *readBuf) {
c.txnStatus = transactionStatus(r.byte())
}

func parseMeta(r *readBuf) (cols []string, rowTyps []oid.Oid) {
func parseStatementRowDescribe(r *readBuf) (cols []string, rowTyps []oid.Oid) {
n := r.int16()
cols = make([]string, n)
rowTyps = make([]oid.Oid, n)
for i := range cols {
cols[i] = r.string()
r.next(6)
rowTyps[i] = r.oid()
r.next(8)
r.next(6)
// format code not known; always 0
r.next(2)
}
return
}

func parseMeta(r *readBuf) (cols []string, rowFmts []format, rowTyps []oid.Oid) {
n := r.int16()
cols = make([]string, n)
rowFmts = make([]format, n)
rowTyps = make([]oid.Oid, n)
for i := range cols {
cols[i] = r.string()
r.next(6)
rowTyps[i] = r.oid()
r.next(6)
rowFmts[i] = format(r.int16())
}
return
}
Expand Down
13 changes: 9 additions & 4 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func TestEncodeDecode(t *testing.T) {
'2000-1-1 01:02:03.04-7'::timestamptz,
0::boolean,
123,
-321,
3.14::float8
WHERE
E'\\000\\001\\002'::bytea = $1
Expand Down Expand Up @@ -366,9 +367,9 @@ func TestEncodeDecode(t *testing.T) {
var got2 string
var got3 = sql.NullInt64{Valid: true}
var got4 time.Time
var got5, got6, got7 interface{}
var got5, got6, got7, got8 interface{}

err = r.Scan(&got1, &got2, &got3, &got4, &got5, &got6, &got7)
err = r.Scan(&got1, &got2, &got3, &got4, &got5, &got6, &got7, &got8)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -397,8 +398,12 @@ func TestEncodeDecode(t *testing.T) {
t.Fatalf("expected 123, got %d", got6)
}

if got7 != float64(3.14) {
t.Fatalf("expected 3.14, got %f", got7)
if got7 != int64(-321) {
t.Fatalf("expected -321, got %d", got7)
}

if got8 != float64(3.14) {
t.Fatalf("expected 3.14, got %f", got8)
}
}

Expand Down
Loading

0 comments on commit c4afb3f

Please sign in to comment.