Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review changes from upstream #2

Closed
wants to merge 0 commits into from
Closed

Review changes from upstream #2

wants to merge 0 commits into from

Conversation

angelini
Copy link

@angelini angelini commented Jul 19, 2024

@airhorns I've been leaving some notes here, but I'm not really sure about some of these changes.

@@ -24,6 +25,12 @@ func initPgxConnections(cfg *config.DatabaseCfg, logger *slog.Logger) (*pgx.Conn
Password: cfg.Password,
}

if cfg.SSL {
pgxConf.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to be doing this in prod? Can we wrap this in an env check?

}

l.log.Debug("WAL message has been received", slog.Uint64("wal", msg.WalMessage.WalStart))
}).Pipe(runtime.NumCPU(), func(raw interface{}) interface{} {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the Pipe library that's introducing the use of any and casting between each stage?

Would it not make more sense to use typed channels here?

// loop over once triggering the publish of all the events
for i, event := range batch.events {
subjectNames[i] = event.SubjectName(l.cfg)
results[i] = l.publisher.Publish(ctx, subjectNames[i], event)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we going to mess up ordering here?

Events are fully ordered out from the last stage, but now we're fanning out into numCPU amount of goroutines, and wouldn't that allow a batch to be published out of order?

// loop over a second time awaiting the result of each publish
for i, result := range results {
_, err := result.Get(ctx)
if err != nil {
l.monitor.IncProblematicEvents(problemKindPublish)
return fmt.Errorf("publish: %w", err)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems kinda problematic, you'll increment a failure event for the first failure but then return out of this whole process. Leaving all the other results just hanging. Potentially missing a lot of problems and having hanging background work.


msg, err := l.replicator.WaitForReplicationMessage(ctx)
if err != nil {
wg.Add(1)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you add to the wg in the failure mode but not when you push a message?

l.log.Debug("ack WAL message", slog.Uint64("lsn", l.readLSN()))
}
default:
panic(fmt.Sprintf("unexpected type in pipeline: %T", v))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit smelly, if we were using typed channels stuff like this wouldn't be necessary.


func newPipe(job Job, concurrency int, processErrors bool) *pipe {
p := &pipe{
in: make(jobChan, 1),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are all of the channels buffered of size 1?

They will block as long as there is a message already in the queue

}
var orderingKey string
if p.enableOrdering {
if len(event.PrimaryKey) > 0 {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have ordering enabled but no primary key this table will be limited to 1Mbps of throughput?

Should we warn or error out in that case?

@angelini
Copy link
Author

I really think I'd avoid all of the uses of wait group and switch to errgroup, from the Go examples page about wait group:

Note that this approach has no straightforward way to propagate errors from workers. For more advanced use cases, consider using the errgroup package.

https://gobyexample.com/waitgroups

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant