Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spannerlib/socket-server/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
binaries
17 changes: 17 additions & 0 deletions spannerlib/socket-server/build-executables.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Builds the socket server binary for darwin/arm64, linux/x64, and windows/x64.
# The binaries are stored in the following files:
# binaries/osx-arm64/spannerlib_socket_server
# binaries/linux-x64/spannerlib_socket_server
# binaries/win-x64/spannerlib_socket_server.exe

mkdir -p binaries/osx-arm64
GOOS=darwin GOARCH=arm64 go build -o binaries/osx-arm64/spannerlib_socket_server server.go
chmod +x binaries/osx-arm64/spannerlib_socket_server

mkdir -p binaries/linux-x64
GOOS=linux GOARCH=amd64 go build -o binaries/linux-x64/spannerlib_socket_server server.go
chmod +x binaries/linux-x64/spannerlib_socket_server

mkdir -p binaries/win-x64
GOOS=windows GOARCH=amd64 go build -o binaries/win-x64/spannerlib_socket_server.exe server.go
chmod +x binaries/win-x64/spannerlib_socket_server.exe
126 changes: 126 additions & 0 deletions spannerlib/socket-server/client/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package client

import (
"bufio"
"net"

"cloud.google.com/go/spanner/apiv1/spannerpb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"spannerlib/socket-server/message"
"spannerlib/socket-server/protocol"
)

type Connection struct {
pool *Pool

conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
}

func (c *Connection) Begin(options *spannerpb.TransactionOptions) error {
msg := message.CreateBeginMessage(options)
if err := msg.Write(c.writer); err != nil {
return err
}
if err := c.writer.Flush(); err != nil {
return err
}
_, err := message.ReadMessageOrError(c.reader)
if err != nil {
return err
}
return nil
}
Comment on lines +22 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pattern of writing a message and flushing the writer is repeated in Begin, Commit, Rollback, Execute, and ExecuteBatch. This code can be refactored into a helper method to reduce duplication and improve maintainability.

For example:

func (c *Connection) send(msg message.Message) error {
	if err := msg.Write(c.writer); err != nil {
		return err
	}
	return c.writer.Flush()
}

The Begin method would then be simplified to:

func (c *Connection) Begin(options *spannerpb.TransactionOptions) error {
	msg := message.CreateBeginMessage(options)
	if err := c.send(msg); err != nil {
		return err
	}
	_, err := message.ReadMessageOrError(c.reader)
	return err
}


func (c *Connection) Commit() (*spannerpb.CommitResponse, error) {
msg := message.CreateCommitMessage()
if err := msg.Write(c.writer); err != nil {
return nil, err
}
if err := c.writer.Flush(); err != nil {
return nil, err
}
m, err := message.ReadMessageOrError(c.reader)
if err != nil {
return nil, err
}
result, ok := m.(*message.CommitResultMessage)
if !ok {
return nil, status.Error(codes.Internal, "message is not a CommitResultMessage")
}

return result.Response, nil
}

func (c *Connection) Rollback() error {
msg := message.CreateRollbackMessage()
if err := msg.Write(c.writer); err != nil {
return err
}
if err := c.writer.Flush(); err != nil {
return err
}
_, err := message.ReadMessageOrError(c.reader)
if err != nil {
return err
}
return nil
}

func (c *Connection) Execute(request *spannerpb.ExecuteSqlRequest) (*Rows, error) {
msg := message.CreateExecuteMessage(request)
if err := msg.Write(c.writer); err != nil {
return nil, err
}
if err := c.writer.Flush(); err != nil {
return nil, err
}
r, err := message.ReadMessageOrError(c.reader)
if err != nil {
return nil, err
}
rowsMsg, ok := r.(*message.RowsMessage)
if !ok {
return nil, status.Error(codes.Internal, "message type is not RowsMessage")
}
metadata, err := protocol.ReadMetadata(c.reader)
if err != nil {
return nil, err
}
return &Rows{
conn: c,
id: rowsMsg.Id,
metadata: metadata,
}, nil
}

func (c *Connection) ExecuteBatch(request *spannerpb.ExecuteBatchDmlRequest) (*spannerpb.ExecuteBatchDmlResponse, error) {
msg := message.CreateExecuteBatchMessage(request)
if err := msg.Write(c.writer); err != nil {
return nil, err
}
if err := c.writer.Flush(); err != nil {
return nil, err
}
r, err := message.ReadMessageOrError(c.reader)
if err != nil {
return nil, err
}
batchMsg, ok := r.(*message.BatchResultMessage)
if !ok {
return nil, status.Error(codes.Internal, "message type is not BatchResultMessage")
}
return batchMsg.Response, nil
}

func (c *Connection) Close() error {
if err := c.conn.Close(); err != nil {
return err
}
c.reader = nil
c.writer = nil
c.conn = nil
return nil
}
57 changes: 57 additions & 0 deletions spannerlib/socket-server/client/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package client

import (
"bufio"
"net"

"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"spannerlib/socket-server/message"
)

type Pool struct {
tp string
addr string

id string
dsn string
}

func CreatePool(tp, addr, dsn string) *Pool {
return CreatePoolWithId(tp, addr, dsn, uuid.New().String())
}

func CreatePoolWithId(tp, addr, dsn, id string) *Pool {
return &Pool{tp: tp, addr: addr, id: id, dsn: dsn}
}

func (p *Pool) CreateConnection() (*Connection, error) {
conn, err := net.Dial(p.tp, p.addr)
if err != nil {
return nil, err
}
connection := &Connection{
pool: p,
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
}
startup := message.CreateStartupMessage(p.id, p.dsn)
if err := startup.Write(connection.writer); err != nil {
return nil, err
}
if err := connection.writer.Flush(); err != nil {
return nil, err
}
if msg, err := message.ReadMessageOrError(connection.reader); err != nil {
_ = connection.Close()
return nil, err
} else {
if _, ok := msg.(*message.StatusMessage); !ok {
return nil, status.Error(codes.Internal, "message type is not StatusMessage")
}
}

return connection, nil
}
51 changes: 51 additions & 0 deletions spannerlib/socket-server/client/rows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package client

import (
"cloud.google.com/go/spanner/apiv1/spannerpb"
"google.golang.org/protobuf/types/known/structpb"
"spannerlib/socket-server/protocol"
)

type Rows struct {
conn *Connection
id int64

metadata *spannerpb.ResultSetMetadata
stats *spannerpb.ResultSetStats
}

func (r *Rows) Next() (*structpb.ListValue, error) {
var hasMoreRows bool
if err := protocol.ReadBool(r.conn.reader, &hasMoreRows); err != nil {
return nil, err
}
if !hasMoreRows {
stats, err := protocol.ReadStats(r.conn.reader)
if err != nil {
return nil, err
}
r.stats = stats
return nil, nil
}

row, err := protocol.ReadRow(r.conn.reader, r.metadata)
if err != nil {
return nil, err
}
return row, nil
}

func (r *Rows) Close() error {
if r.stats == nil {
for {
row, err := r.Next()
if err != nil {
return err
}
if row == nil {
break
}
}
}
return nil
}
45 changes: 45 additions & 0 deletions spannerlib/socket-server/message/batch_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package message

import (
"bufio"
"fmt"

"cloud.google.com/go/spanner/apiv1/spannerpb"
"spannerlib/socket-server/protocol"
)

var _ Message = &BatchResultMessage{}

type BatchResultMessage struct {
message
Response *spannerpb.ExecuteBatchDmlResponse
}

func CreateBatchResultMessage(res *spannerpb.ExecuteBatchDmlResponse) *BatchResultMessage {
return &BatchResultMessage{
message: message{messageId: BatchResultMessageId},
Response: res,
}
}

func (m *BatchResultMessage) String() string {
return fmt.Sprintf("BatchResultMessage: %v", m.Response)
}

func (m *BatchResultMessage) MessageId() Id {
return BatchResultMessageId
}

func (m *BatchResultMessage) Write(writer *bufio.Writer) error {
if err := m.writeHeader(writer); err != nil {
return err
}
if err := protocol.WriteExecuteBatchResponse(writer, m.Response); err != nil {
return err
}
return nil
}

func (m *BatchResultMessage) Handle(handler Handler) error {
return handler.HandleBatchResult(m)
}
41 changes: 41 additions & 0 deletions spannerlib/socket-server/message/begin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package message

import (
"bufio"
"fmt"

"cloud.google.com/go/spanner/apiv1/spannerpb"
"spannerlib/socket-server/protocol"
)

var _ Message = &BeginMessage{}

type BeginMessage struct {
message
Options *spannerpb.TransactionOptions
}

func CreateBeginMessage(options *spannerpb.TransactionOptions) *BeginMessage {
return &BeginMessage{
message: message{messageId: BeginMessageId},
Options: options,
}
}

func (m *BeginMessage) String() string {
return fmt.Sprintf("BeginMessage: %v", m.Options)
}

func (m *BeginMessage) Write(writer *bufio.Writer) error {
if err := m.writeHeader(writer); err != nil {
return err
}
if err := protocol.WriteTransactionOptions(writer, m.Options); err != nil {
return err
}
return nil
}

func (m *BeginMessage) Handle(handler Handler) error {
return handler.HandleBegin(m)
}
33 changes: 33 additions & 0 deletions spannerlib/socket-server/message/commit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package message

import (
"bufio"
"fmt"
)

var _ Message = &CommitMessage{}

type CommitMessage struct {
message
}

func CreateCommitMessage() *CommitMessage {
return &CommitMessage{
message: message{messageId: CommitMessageId},
}
}

func (m *CommitMessage) String() string {
return fmt.Sprintf("CommitMessage")
}

func (m *CommitMessage) Write(writer *bufio.Writer) error {
if err := m.writeHeader(writer); err != nil {
return err
}
return nil
}

func (m *CommitMessage) Handle(handler Handler) error {
return handler.HandleCommit(m)
}
Loading
Loading