Skip to content

Commit

Permalink
Support for multi-source followers
Browse files Browse the repository at this point in the history
  • Loading branch information
oxtoacart committed Jul 19, 2019
1 parent 256bea1 commit f5be000
Show file tree
Hide file tree
Showing 32 changed files with 2,311 additions and 1,143 deletions.
411 changes: 262 additions & 149 deletions cluster_follow.go

Large diffs are not rendered by default.

29 changes: 16 additions & 13 deletions cluster_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (db *DB) remoteQueryHandlerForPartition(partition int) planner.QueryCluster
func (db *DB) queryForRemote(ctx context.Context, sqlString string, isSubQuery bool, subQueryResults [][]interface{}, unflat bool, onFields core.OnFields, onRow core.OnRow, onFlatRow core.OnFlatRow) (result interface{}, err error) {
source, prepareErr := db.Query(sqlString, isSubQuery, subQueryResults, common.ShouldIncludeMemStore(ctx))
if prepareErr != nil {
log.Errorf("Error on preparing query for remote: %v", prepareErr)
db.log.Errorf("Error on preparing query for remote: %v", prepareErr)
return nil, prepareErr
}
elapsed := mtime.Stopwatch()
defer func() {
log.Debugf("Processed query in %v, error?: %v : %v", elapsed(), err, sqlString)
db.log.Debugf("Processed query in %v, error?: %v : %v", elapsed(), err, sqlString)
}()
if unflat {
result, err = core.UnflattenOptimized(source).Iterate(ctx, onFields, onRow)
Expand Down Expand Up @@ -156,7 +156,7 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
elapsed := mtime.Stopwatch()
query := db.remoteQueryHandlerForPartition(partition)
if query == nil {
log.Errorf("No query handler for partition %d, ignoring", partition)
db.log.Errorf("No query handler for partition %d, ignoring", partition)
results <- &remoteResult{
partition: partition,
totalRows: 0,
Expand Down Expand Up @@ -213,10 +213,10 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
if err != nil {
switch err.(type) {
case common.Retriable:
log.Debugf("Failed on partition %d but error is retriable, continuing: %v", partition, err)
db.log.Debugf("Failed on partition %d but error is retriable, continuing: %v", partition, err)
continue
default:
log.Debugf("Failed on partition %d and error is not retriable, will abort: %v", partition, err)
db.log.Debugf("Failed on partition %d and error is not retriable, will abort: %v", partition, err)
}
}
var highWaterMark int64
Expand All @@ -241,9 +241,12 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
if ctxHasDeadline {
deadline = ctxDeadline
}
log.Debugf("Deadline for results from partitions: %v (T - %v)", deadline, deadline.Sub(time.Now()))
db.log.Debugf("Deadline for results from partitions: %v (T - %v)", deadline, deadline.Sub(time.Now()))

timeout := deadline.Sub(time.Now())
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()

timeout := time.NewTimer(deadline.Sub(time.Now()))
var canonicalFields core.Fields
fieldsByPartition := make([]core.Fields, db.opts.NumPartitions)
partitionRowMappers := make([]func(core.Vals) core.Vals, db.opts.NumPartitions)
Expand All @@ -255,7 +258,7 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
partitionFields := result.fields
if partitionFields != nil {
if canonicalFields == nil {
log.Debugf("fields: %v", partitionFields)
db.log.Debugf("fields: %v", partitionFields)
err := onFields(partitionFields)
if err != nil {
fail(result.partition, err)
Expand Down Expand Up @@ -305,14 +308,14 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
resultCount++
pendingPartitions--
if result.err != nil {
log.Errorf("Error from partition %d: %v", result.partition, result.err)
db.log.Errorf("Error from partition %d: %v", result.partition, result.err)
fail(result.partition, result.err)
}
finish(result)
log.Debugf("%d/%d got %d results from partition %d in %v", resultCount, db.opts.NumPartitions, result.totalRows, result.partition, result.elapsed)
db.log.Debugf("%d/%d got %d results from partition %d in %v", resultCount, db.opts.NumPartitions, result.totalRows, result.partition, result.elapsed)
delete(resultsByPartition, result.partition)
case <-timeout.C:
log.Errorf("Failed to get results by deadline, %d of %d partitions reporting", resultCount, numPartitions)
case <-timeoutTimer.C:
db.log.Errorf("Failed to get results by within %v, %d of %d partitions reporting", timeout, resultCount, numPartitions)
msg := bytes.NewBuffer([]byte("Missing partitions: "))
first := true
for partition, results := range resultsByPartition {
Expand All @@ -323,7 +326,7 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
msg.WriteString(fmt.Sprintf("%d (%d)", partition, results))
fail(partition, core.ErrDeadlineExceeded)
}
log.Debug(msg.String())
db.log.Debug(msg.String())
return finalStats(), finalErr()
}
}
Expand Down
Loading

0 comments on commit f5be000

Please sign in to comment.