-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for multi-source following #79
Conversation
@@ -93,3 +94,67 @@ func NanosToMillis(nanos int64) int64 { | |||
func TimeToMillis(ts time.Time) int64 { | |||
return NanosToMillis(ts.UnixNano()) | |||
} | |||
|
|||
// OffsetsBySource is a map of wal Offsets keyed to source ids | |||
type OffsetsBySource map[int]wal.Offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new type appears in a lot of places. Basically, in places where we used to track a single offset, we now track a map of offsets keyed to source IDs.
@@ -0,0 +1,384 @@ | |||
package server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new package includes a lot of the logic that used to live in the zeno command. Pulling it out into its own package makes it easier to test as well as embed.
c580132
to
0da5576
Compare
|
||
dest := leader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic moved into server.go
@@ -27,7 +27,7 @@ type ClientOpts struct { | |||
} | |||
|
|||
type Inserter interface { | |||
Insert(ts time.Time, dims map[string]interface{}, vals func(func(string, interface{}))) error | |||
Insert(ts time.Time, dims map[string]interface{}, vals func(func(string, float64))) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive-by, being a bit more strict in what gets passed to this interface function.
@@ -0,0 +1,271 @@ | |||
package server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old zenodb_test simulated clusters but didn't actually use the real RPC logic. This new test actually uses the real RPC logic to make sure that code-path is exercised.
b2627e3
to
2316219
Compare
7ade067
to
17c6fb7
Compare
db7dcd1
to
f5be000
Compare
@@ -31,16 +34,16 @@ type walEntry struct { | |||
} | |||
|
|||
type followSpec struct { | |||
followerID int | |||
followerID common.FollowerID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main change to this file is that followers are now identified by their ID, which is a combination of the server ID and the partition number.
db.log.Debug("Done force flushing tables") | ||
} | ||
|
||
// Go starts a goroutine with a task. The task should look for the stop channel to close, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll see this function used in a lot of places. This gives us a standard pattern for stopping goroutines when we close the database.
) | ||
|
||
var ( | ||
log = golog.LoggerFor("zenodb") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logger is now a property on the database so that we can include the database type and id as part of our log messages.
return errors.New("Error iterating on %v: %v", inFile, err) | ||
} | ||
} | ||
// TODO: make this work with multi-source offsets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't regularly used in production and we can add it back later.
"github.com/getlantern/zenodb/core" | ||
"github.com/getlantern/zenodb/encoding" | ||
) | ||
|
||
const ( | ||
// File format versions | ||
FileVersion_4 = 4 | ||
CurrentFileVersion = FileVersion_4 | ||
FileVersion_5 = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We introduce a new file version because the header now includes not one offset, but multiple offsets per source (leader).
} | ||
|
||
func (sc *snappyConn) Read(p []byte) (int, error) { | ||
return sc.r.Read(p) | ||
} | ||
|
||
func (sc *snappyConn) Write(p []byte) (int, error) { | ||
return sc.w.Write(p) | ||
sc.mx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive-by fix for race condition.
@@ -60,100 +63,6 @@ func TestSingleDB(t *testing.T) { | |||
}) | |||
} | |||
|
|||
func TestClusterPushdownSinglePartition(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cluster stuff never actually tested the full clustering path because it bypassed RPC. The new tests in server_test.go cover real clusters, so I removed these.
@@ -805,65 +709,3 @@ func (er expectedResult) assert(t *testing.T, db *DB, sqlString string, includeM | |||
assert.False(t, timedOut, "Timed out running %v", sqlString) | |||
} | |||
} | |||
|
|||
type expectedRow struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to testsupport for sharing with server_test
0451923
to
4220da5
Compare
c0d93f9
to
80b3b2e
Compare
… actually configured with a zero ID
Tests passing reliably and quickly
347f915
to
3b12eb0
Compare
This allows us to designate multiple intake nodes.
For getlantern/lantern-internal#2852
Depends on getlantern/wal#5
Okay, this is pretty much ready. There's an occasional test failure due to port reuse which I still need to sort out, but I think we're good otherwise.Okay, tests seem to pass reliably.