Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multi-source following #79

Merged
merged 14 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
411 changes: 262 additions & 149 deletions cluster_follow.go

Large diffs are not rendered by default.

29 changes: 16 additions & 13 deletions cluster_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (db *DB) remoteQueryHandlerForPartition(partition int) planner.QueryCluster
func (db *DB) queryForRemote(ctx context.Context, sqlString string, isSubQuery bool, subQueryResults [][]interface{}, unflat bool, onFields core.OnFields, onRow core.OnRow, onFlatRow core.OnFlatRow) (result interface{}, err error) {
source, prepareErr := db.Query(sqlString, isSubQuery, subQueryResults, common.ShouldIncludeMemStore(ctx))
if prepareErr != nil {
log.Errorf("Error on preparing query for remote: %v", prepareErr)
db.log.Errorf("Error on preparing query for remote: %v", prepareErr)
return nil, prepareErr
}
elapsed := mtime.Stopwatch()
defer func() {
log.Debugf("Processed query in %v, error?: %v : %v", elapsed(), err, sqlString)
db.log.Debugf("Processed query in %v, error?: %v : %v", elapsed(), err, sqlString)
}()
if unflat {
result, err = core.UnflattenOptimized(source).Iterate(ctx, onFields, onRow)
Expand Down Expand Up @@ -156,7 +156,7 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
elapsed := mtime.Stopwatch()
query := db.remoteQueryHandlerForPartition(partition)
if query == nil {
log.Errorf("No query handler for partition %d, ignoring", partition)
db.log.Errorf("No query handler for partition %d, ignoring", partition)
results <- &remoteResult{
partition: partition,
totalRows: 0,
Expand Down Expand Up @@ -213,10 +213,10 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
if err != nil {
switch err.(type) {
case common.Retriable:
log.Debugf("Failed on partition %d but error is retriable, continuing: %v", partition, err)
db.log.Debugf("Failed on partition %d but error is retriable, continuing: %v", partition, err)
continue
default:
log.Debugf("Failed on partition %d and error is not retriable, will abort: %v", partition, err)
db.log.Debugf("Failed on partition %d and error is not retriable, will abort: %v", partition, err)
}
}
var highWaterMark int64
Expand All @@ -241,9 +241,12 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
if ctxHasDeadline {
deadline = ctxDeadline
}
log.Debugf("Deadline for results from partitions: %v (T - %v)", deadline, deadline.Sub(time.Now()))
db.log.Debugf("Deadline for results from partitions: %v (T - %v)", deadline, deadline.Sub(time.Now()))

timeout := deadline.Sub(time.Now())
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()

timeout := time.NewTimer(deadline.Sub(time.Now()))
var canonicalFields core.Fields
fieldsByPartition := make([]core.Fields, db.opts.NumPartitions)
partitionRowMappers := make([]func(core.Vals) core.Vals, db.opts.NumPartitions)
Expand All @@ -255,7 +258,7 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
partitionFields := result.fields
if partitionFields != nil {
if canonicalFields == nil {
log.Debugf("fields: %v", partitionFields)
db.log.Debugf("fields: %v", partitionFields)
err := onFields(partitionFields)
if err != nil {
fail(result.partition, err)
Expand Down Expand Up @@ -305,14 +308,14 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
resultCount++
pendingPartitions--
if result.err != nil {
log.Errorf("Error from partition %d: %v", result.partition, result.err)
db.log.Errorf("Error from partition %d: %v", result.partition, result.err)
fail(result.partition, result.err)
}
finish(result)
log.Debugf("%d/%d got %d results from partition %d in %v", resultCount, db.opts.NumPartitions, result.totalRows, result.partition, result.elapsed)
db.log.Debugf("%d/%d got %d results from partition %d in %v", resultCount, db.opts.NumPartitions, result.totalRows, result.partition, result.elapsed)
delete(resultsByPartition, result.partition)
case <-timeout.C:
log.Errorf("Failed to get results by deadline, %d of %d partitions reporting", resultCount, numPartitions)
case <-timeoutTimer.C:
db.log.Errorf("Failed to get results by within %v, %d of %d partitions reporting", timeout, resultCount, numPartitions)
msg := bytes.NewBuffer([]byte("Missing partitions: "))
first := true
for partition, results := range resultsByPartition {
Expand All @@ -323,7 +326,7 @@ func (db *DB) queryCluster(ctx context.Context, sqlString string, isSubQuery boo
msg.WriteString(fmt.Sprintf("%d (%d)", partition, results))
fail(partition, core.ErrDeadlineExceeded)
}
log.Debug(msg.String())
db.log.Debug(msg.String())
return finalStats(), finalErr()
}
}
Expand Down
Loading