Skip to content

Commit c538972

Browse files
authored
Merge pull request #6 from oracle/feature/on_demand
Added support for on-demand tables in the cloud.
2 parents 6dd089e + 8a53fed commit c538972

File tree

14 files changed

+520
-93
lines changed

14 files changed

+520
-93
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@ All notable changes to this project will be documented in this file.
33

44
The format is based on [Keep a Changelog](http://keepachangelog.com/).
55

6-
## Unpublished
6+
## [Unpublished]
77

88
### Added
99
- Added latest Oracle Cloud Infrastructure regions and region codes: LIN, MTZ, VCP, BRS, UKB, JNB, SIN, MRS, ARN, AUH, MCT, WGA.
10+
- Cloud only: Added support for creating on-demand tables
11+
- On-Prem only: Added support for setting Durability in put/delete operations
12+
- Added support for returning row modification time in get operations
13+
14+
### Changed
15+
- TableLimits now includes a CapacityMode field, to allow for specifying OnDemand. The default is Provisioned. This may affect existing code if the TableLimits struct was created without using named fields.
16+
- Internal logic now detects differences in server protocol version, and decrements its internal serial version to match. This is to maintain compatibility when a new driver is used with an old server.
1017

1118
## 1.2.2 - 2021-06-08
1219

internal/test/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ func createClient(cfg *Config) (*nosqldb.Client, error) {
148148
return nil, err
149149
}
150150

151+
// this will set the protocol serial version according to the connected server.
152+
// ignore errors here, they may be expected.
153+
client.VerifyConnection()
154+
151155
if interceptor != nil {
152156
err = interceptor.OnSetupClient(client)
153157
if err != nil {

nosqldb/bad_protocol_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ func (suite *BadProtocolTestSuite) SetupSuite() {
5353
suite.bpTestClient, err = nosqldb.NewClient(suite.Client.Config)
5454
suite.Require().NoErrorf(err, "failed to create a client, got error %v", err)
5555

56+
// this will set the serial protocol version. Ignore errors from it.
57+
suite.bpTestClient.VerifyConnection()
58+
5659
// Disable retry handling.
5760
suite.bpTestClient.RetryHandler = nil
5861
suite.bpTestClient.AuthorizationProvider = suite.Client.AuthorizationProvider
@@ -580,6 +583,7 @@ func (suite *BadProtocolTestSuite) TestBadPutRequest() {
580583
3, // RequestTimeout: packed int
581584
suite.tableLen, // TableName: String
582585
1, // ReturnRow: boolean
586+
1, // Durability: 1 byte (serialVersion > 2)
583587
1, // ExactMatch: boolean
584588
1, // IdentityCacheSize: packed int
585589
suite.valueLen, // Record: MapValue
@@ -629,7 +633,10 @@ func (suite *BadProtocolTestSuite) TestBadPutRequest() {
629633
}
630634

631635
// Invalid TTL value/unit.
632-
off = seekPos(lengths, 9)
636+
off = seekPos(lengths, 10)
637+
if suite.bpTestClient.GetSerialVersion() < 3 {
638+
off -= 1 // Durability
639+
}
633640
desc = "invalid TTL value"
634641
copy(data, origData)
635642
suite.wr.Reset()
@@ -718,6 +725,7 @@ func (suite *BadProtocolTestSuite) TestBadWriteMultipleRequest() {
718725
3, // RequestTimeout: packed int
719726
suite.tableLen, // TableName: string
720727
1, // OperationNum: packed int
728+
1, // Durability: 1 byte (serialVersion > 2)
721729
1, // abortOnFail: boolean
722730
0, // Sub requests: the size does not matter for this test.
723731
}
@@ -744,7 +752,10 @@ func (suite *BadProtocolTestSuite) TestBadWriteMultipleRequest() {
744752
}
745753

746754
// Invalid opcode for sub requests.
747-
off = seekPos(lengths, 6)
755+
off = seekPos(lengths, 7)
756+
if suite.bpTestClient.GetSerialVersion() < 3 {
757+
off -= 1 // Durability
758+
}
748759
testOpCodes := []proto.OpCode{proto.OpCode(-1), proto.Get}
749760
for _, v := range testOpCodes {
750761
desc = fmt.Sprintf("invalid opcode %v", v)
@@ -772,6 +783,7 @@ func (suite *BadProtocolTestSuite) TestBadMultiDeleteRequest() {
772783
1, // OpCode: byte
773784
3, // RequestTimeout: packed int
774785
suite.tableLen, // TableName: string
786+
1, // Durability: 1 byte (serialVersion > 2)
775787
suite.keyLen, // Key: MapValue
776788
1, // HasFieldRange: boolean
777789
3, // MaxWriteKB: packed int
@@ -788,7 +800,10 @@ func (suite *BadProtocolTestSuite) TestBadMultiDeleteRequest() {
788800
suite.doBadProtoTest(req, data, desc, 0)
789801

790802
// Invalid MaxWriteKB.
791-
off = seekPos(lengths, 6)
803+
off = seekPos(lengths, 7)
804+
if suite.bpTestClient.GetSerialVersion() < 3 {
805+
off -= 1 // Durability
806+
}
792807
var tests []int
793808
if test.IsOnPrem() {
794809
// There is no limit on MaxWriteKB for the on-premise server.
@@ -807,7 +822,10 @@ func (suite *BadProtocolTestSuite) TestBadMultiDeleteRequest() {
807822
}
808823

809824
// Invalid length of ContinuationKey.
810-
off = seekPos(lengths, 7)
825+
off = seekPos(lengths, 8)
826+
if suite.bpTestClient.GetSerialVersion() < 3 {
827+
off -= 1 // Durability
828+
}
811829
tests = []int{-2, 100}
812830
for _, v := range tests {
813831
desc = fmt.Sprintf("invalid length of ContinuationKey %v", v)
@@ -823,9 +841,14 @@ func (suite *BadProtocolTestSuite) TestBadTableRequest() {
823841
newTable := "ABCD"
824842
stmt := "create table if not exists " + newTable + " (id integer, primary key(id))"
825843
stmtLen, _ := suite.wr.WriteString(&stmt)
844+
limits := &nosqldb.TableLimits{
845+
ReadUnits: 50,
846+
WriteUnits: 50,
847+
StorageGB: 2,
848+
}
826849
req := &nosqldb.TableRequest{
827850
Statement: stmt,
828-
TableLimits: &nosqldb.TableLimits{50, 50, 5},
851+
TableLimits: limits,
829852
}
830853

831854
var desc string
@@ -839,6 +862,7 @@ func (suite *BadProtocolTestSuite) TestBadTableRequest() {
839862
4, // ReadKB: int
840863
4, // WriteKB: int
841864
4, // StorageGB: int
865+
1, // LimitMode: byte (serialVersion > 2)
842866
1, // HasTableName: boolean
843867
}
844868

nosqldb/client.go

Lines changed: 115 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/oracle/nosql-go-sdk/nosqldb/jsonutil"
3232
"github.com/oracle/nosql-go-sdk/nosqldb/logger"
3333
"github.com/oracle/nosql-go-sdk/nosqldb/nosqlerr"
34+
"github.com/oracle/nosql-go-sdk/nosqldb/types"
3435
)
3536

3637
// Client represents an Oracle NoSQL database client used to access the Oracle
@@ -79,6 +80,12 @@ type Client struct {
7980
// Keep an internal map of tablename to next limits update time
8081
tableLimitUpdateMap map[string]int64
8182
limitMux sync.Mutex
83+
84+
// (possibly negotiated) version of the protocol in use
85+
serialVersion int16
86+
87+
// for managing one-time messaging
88+
oneTimeMessages map[string]struct{}
8289
}
8390

8491
var (
@@ -112,14 +119,15 @@ func NewClient(cfg Config) (*Client, error) {
112119
}
113120

114121
c := &Client{
115-
Config: cfg,
116-
HTTPClient: cfg.httpClient,
117-
requestURL: cfg.Endpoint + sdkutil.DataServiceURI,
118-
requestID: 0,
119-
serverHost: cfg.host,
120-
executor: cfg.httpClient,
121-
logger: cfg.Logger,
122-
isCloud: cfg.IsCloud() || cfg.IsCloudSim(),
122+
Config: cfg,
123+
HTTPClient: cfg.httpClient,
124+
requestURL: cfg.Endpoint + sdkutil.DataServiceURI,
125+
requestID: 0,
126+
serverHost: cfg.host,
127+
executor: cfg.httpClient,
128+
logger: cfg.Logger,
129+
isCloud: cfg.IsCloud() || cfg.IsCloudSim(),
130+
serialVersion: proto.DefaultSerialVersion,
123131
}
124132
c.handleResponse = c.processResponse
125133
c.queryLogger, err = newQueryLogger()
@@ -132,6 +140,8 @@ func NewClient(cfg Config) (*Client, error) {
132140
c.rateLimiterMap = make(map[string]common.RateLimiterPair)
133141
}
134142

143+
c.oneTimeMessages = make(map[string]struct{})
144+
135145
return c, nil
136146
}
137147

@@ -145,9 +155,8 @@ func (c *Client) Close() error {
145155
c.queryLogger.Close()
146156
}
147157

148-
if c.logger != nil {
149-
c.logger.Close()
150-
}
158+
// do not close logger; it may have been passed to us and
159+
// may still be in use by the application
151160

152161
return nil
153162
}
@@ -800,7 +809,7 @@ func (c *Client) processRequest(req Request) (data []byte, err error) {
800809
return nil, err
801810
}
802811

803-
data, err = serializeRequest(req)
812+
data, err = c.serializeRequest(req)
804813
if err != nil || !c.isCloud {
805814
return
806815
}
@@ -952,7 +961,16 @@ func (c *Client) doExecute(ctx context.Context, req Request, data []byte) (resul
952961
}
953962
}
954963

955-
if !c.handleError(err, req, numThrottleRetries) {
964+
if nosqlerr.Is(err, nosqlerr.UnsupportedProtocol) {
965+
if c.decrementSerialVersion() == false {
966+
return nil, err
967+
}
968+
// if serial version mismatch, we must re-serialize the request
969+
data, err = c.serializeRequest(req)
970+
if err != nil {
971+
return nil, err
972+
}
973+
} else if !c.handleError(err, req, numThrottleRetries) {
956974
return nil, err
957975
}
958976

@@ -1035,6 +1053,35 @@ func (c *Client) doExecute(ctx context.Context, req Request, data []byte) (resul
10351053
return nil, err
10361054
}
10371055

1056+
// warn if using features not implemented at the connected server
1057+
// currently cloud does not support Durability
1058+
if c.serialVersion < 3 || c.isCloud {
1059+
needMsg := false
1060+
if pReq, ok := req.(*PutRequest); ok && pReq.Durability.IsSet() {
1061+
needMsg = true
1062+
} else if dReq, ok := req.(*DeleteRequest); ok && dReq.Durability.IsSet() {
1063+
needMsg = true
1064+
} else if mReq, ok := req.(*MultiDeleteRequest); ok && mReq.Durability.IsSet() {
1065+
needMsg = true
1066+
} else if wReq, ok := req.(*WriteMultipleRequest); ok && wReq.Durability.IsSet() {
1067+
needMsg = true
1068+
}
1069+
if needMsg {
1070+
c.oneTimeMessage("The requested feature is not supported " +
1071+
"by the connected server: Durability")
1072+
}
1073+
}
1074+
1075+
// OnDemand is not available in V2
1076+
if c.serialVersion < 3 {
1077+
if tReq, ok := req.(*TableRequest); ok && tReq.TableLimits != nil {
1078+
if tReq.TableLimits.CapacityMode == types.OnDemand {
1079+
c.oneTimeMessage("The requested feature is not supported " +
1080+
"by the connected server: on demand capacity table")
1081+
}
1082+
}
1083+
}
1084+
10381085
reqCtx, reqCancel := context.WithTimeout(ctx, reqTimeout)
10391086
httpReq = httpReq.WithContext(reqCtx)
10401087
httpResp, err = c.executor.Do(httpReq)
@@ -1287,13 +1334,13 @@ func (c *Client) signHTTPRequest(httpReq *http.Request) error {
12871334
// serializeRequest serializes the specified request into a slice of bytes that
12881335
// will be sent to the server. The serial version is always written followed by
12891336
// the actual request payload.
1290-
func serializeRequest(req Request) (data []byte, err error) {
1337+
func (c *Client) serializeRequest(req Request) (data []byte, err error) {
12911338
wr := binary.NewWriter()
1292-
if _, err = wr.WriteSerialVersion(proto.SerialVersion); err != nil {
1339+
if _, err = wr.WriteSerialVersion(c.serialVersion); err != nil {
12931340
return
12941341
}
12951342

1296-
if err = req.serialize(wr); err != nil {
1343+
if err = req.serialize(wr, c.serialVersion); err != nil {
12971344
return
12981345
}
12991346

@@ -1329,7 +1376,7 @@ func (c *Client) processOKResponse(data []byte, req Request) (Result, error) {
13291376

13301377
// A zero byte represents the operation succeeded.
13311378
if code == 0 {
1332-
res, err := req.deserialize(rd)
1379+
res, err := req.deserialize(rd, c.serialVersion)
13331380
if err != nil {
13341381
return nil, err
13351382
}
@@ -1372,6 +1419,10 @@ func wrapResponseErrors(code int, msg string) error {
13721419
return nosqlerr.New(errCode, "unknown error: %s", msg)
13731420

13741421
case nosqlerr.BadProtocolMessage:
1422+
// V2 proxy will return this message if V3 is used in the driver
1423+
if strings.Contains(msg, "Invalid driver serial version") {
1424+
return nosqlerr.New(nosqlerr.UnsupportedProtocol, msg)
1425+
}
13751426
return nosqlerr.NewIllegalArgument("bad protocol message: %s", msg)
13761427

13771428
default:
@@ -1423,3 +1474,50 @@ func (c *Client) ResetRateLimiters(tableName string) {
14231474
rp.WriteLimiter.Reset()
14241475
rp.ReadLimiter.Reset()
14251476
}
1477+
1478+
// VerifyConnection attempts to verify that the connection is useable.
1479+
// It may check auth credentials, and may negotiate the protocol level
1480+
// to use with the server.
1481+
// This is typically only used in tests.
1482+
func (c *Client) VerifyConnection() error {
1483+
1484+
// issue a GetTable call for a (probably) nonexistent table.
1485+
// expect a TableNotFound error (or success in the unlikely event a
1486+
// table exists with this name). Any other errors will be returned here.
1487+
// Internally, this may result in the client negotiating a lower
1488+
// protocol version, if connected to an older server.
1489+
req := &GetTableRequest{
1490+
TableName: "noop",
1491+
Timeout: 20 * time.Second,
1492+
}
1493+
1494+
_, err := c.GetTable(req)
1495+
if err != nil && nosqlerr.IsTableNotFound(err) == false {
1496+
return err
1497+
}
1498+
1499+
return nil
1500+
}
1501+
1502+
// decrementSerialVersion attempts to reduce the serial version used for
1503+
// communicating with the server. If the version is already at its lowest
1504+
// value, it will not be decremented and false will be returned.
1505+
func (c *Client) decrementSerialVersion() bool {
1506+
if c.serialVersion > 2 {
1507+
c.serialVersion--
1508+
return true
1509+
}
1510+
return false
1511+
}
1512+
1513+
// GetSerialVersion is used for tests.
1514+
func (c *Client) GetSerialVersion() int16 {
1515+
return c.serialVersion
1516+
}
1517+
1518+
func (c *Client) oneTimeMessage(msg string) {
1519+
if _, ok := c.oneTimeMessages[msg]; ok == false {
1520+
c.oneTimeMessages[msg] = struct{}{}
1521+
c.logger.Warn(msg)
1522+
}
1523+
}

nosqldb/data_ops_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -941,9 +941,9 @@ func (suite *DataOpsTestSuite) TestNonNumericDataTypes() {
941941
// int/int64 values are valid for TIMESTAMP data type starting with NoSQL
942942
// Server 20.2.14 on-premise release and Cloud Simulator 1.3.2 release.
943943
//if suite.IsOnPrem() && suite.Version >= "20.2.14" || suite.IsCloudSim() && suite.Version >= "1.3.2" {
944-
tsTest.validValues = append(tsTest.validValues, []types.FieldValue{intVal, int64Val}...)
944+
tsTest.validValues = append(tsTest.validValues, []types.FieldValue{intVal, int64Val}...)
945945
//} else {
946-
//tsTest.invalidValues = append(tsTest.invalidValues, []types.FieldValue{intVal, int64Val}...)
946+
//tsTest.invalidValues = append(tsTest.invalidValues, []types.FieldValue{intVal, int64Val}...)
947947
//}
948948

949949
testCases = append(testCases, tsTest)

0 commit comments

Comments
 (0)