Skip to content

Commit

Permalink
Merge pull request #20 from ihippik/refactoring
Browse files Browse the repository at this point in the history
Refactoring
  • Loading branch information
ihippik authored Dec 16, 2023
2 parents 2d973de + f9e2605 commit d618dbd
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 45 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ If you specify an DSN-string for the [Sentry](https://sentry.io/) project, the n
You can take metrics by specifying an endpoint for Prometheus in the configuration.
#### Available metrics

| name | description | fields |
|-----------------------|--------------------------------------|--------------------|
| published_events | the total number of published events | `subject`, `table` |
| filter_skipped_events | the total number of skipped events | `table` |
| name | description | fields |
|-----------------------------|--------------------------------------|--------------------|
| published_events_total | the total number of published events | `subject`, `table` |
| filter_skipped_events_total | the total number of skipped events | `table` |


## Docker
Expand Down
3 changes: 1 addition & 2 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg *config.DatabaseCfg, logger *slog.Logger) (*pgx.Conn, *pgx.ReplicationConn, error) {
pgxConf := pgx.ConnConfig{
// TODO logger
LogLevel: pgx.LogLevelInfo,
Logger: pgxLogger{logger},
Host: cfg.Host,
Expand Down Expand Up @@ -76,7 +75,7 @@ func factoryPublisher(cfg *config.PublisherCfg, logger *slog.Logger) (eventPubli
return nil, fmt.Errorf("create stream: %w", err)
}

return pub, err
return pub, nil
default:
return nil, fmt.Errorf("unknown publisher type: %s", cfg.Type)
}
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
PublisherTypeKafka PublisherType = "kafka"
)

// Config for wal-listener/
// Config for wal-listener.
type Config struct {
Listener *ListenerCfg `valid:"required"`
Database *DatabaseCfg `valid:"required"`
Expand All @@ -36,7 +36,7 @@ type ListenerCfg struct {
TopicsMap map[string]string
}

// PublisherCfg represent configuration for any types pulisher.
// PublisherCfg represent configuration for any types publisher.
type PublisherCfg struct {
Type PublisherType `valid:"required"`
Address string `valid:"required"`
Expand Down
4 changes: 2 additions & 2 deletions config/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ type Metrics struct {
func NewMetrics() *Metrics {
return &Metrics{
publishedEvents: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "published_events",
Name: "published_events_total",
Help: "The total number of published events",
},
[]string{"app", "subject", "table"},
),
filterSkippedEvents: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "filter_skipped_events",
Name: "filter_skipped_events_total",
Help: "The total number of skipped events",
},
[]string{"app", "table"},
Expand Down
18 changes: 9 additions & 9 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewWalListener(
log *slog.Logger,
repo repository,
repl replication,
publ eventPublisher,
pub eventPublisher,
parser parser,
monitor monitor,
) *Listener {
Expand All @@ -81,7 +81,7 @@ func NewWalListener(
monitor: monitor,
slotName: cfg.Listener.SlotName,
cfg: cfg,
publisher: publ,
publisher: pub,
repository: repo,
replicator: repl,
parser: parser,
Expand Down Expand Up @@ -145,7 +145,7 @@ ProcessLoop:
}
case err := <-l.errChannel:
if errors.As(err, &svcErr) {
return err
return svcErr
}

logger.Error("listener: received error", "err", err)
Expand Down Expand Up @@ -210,23 +210,23 @@ func (l *Listener) Stream(ctx context.Context) {

for {
if err := ctx.Err(); err != nil {
l.errChannel <- newListenerError("stream: context canceled", err)
l.errChannel <- fmt.Errorf("stream: context canceled: %w", err)
break
}

msg, err := l.replicator.WaitForReplicationMessage(ctx)
if err != nil {
l.errChannel <- newListenerError("WaitForReplicationMessage", err)
l.errChannel <- newListenerError("stream: wait for replication message", err)
continue
}

if msg != nil {
if msg.WalMessage != nil {
l.log.Debug("receive wal message", slog.Uint64("wal", msg.WalMessage.WalStart))
l.log.Debug("receive WAL message", slog.Uint64("wal", msg.WalMessage.WalStart))

if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil {
l.log.Error("message parse failed", "err", err)
l.errChannel <- fmt.Errorf("parse wal message: %w", err)
l.errChannel <- fmt.Errorf("parse WAL message: %w", err)

continue
}
Expand Down Expand Up @@ -258,11 +258,11 @@ func (l *Listener) Stream(ctx context.Context) {

if msg.WalMessage.WalStart > l.readLSN() {
if err = l.AckWalMessage(msg.WalMessage.WalStart); err != nil {
l.errChannel <- fmt.Errorf("acknowledge wal message: %w", err)
l.errChannel <- fmt.Errorf("acknowledge WAL message: %w", err)
continue
}

l.log.Debug("ack wal msg", slog.Uint64("lsn", l.readLSN()))
l.log.Debug("ack WAL msg", slog.Uint64("lsn", l.readLSN()))
}
}

Expand Down
227 changes: 224 additions & 3 deletions listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,7 @@ func TestListener_Stream(t *testing.T) {
t.Errorf("- want + got\n- %#+v\n+ %#+v", want, got)
}
return ok
})).Return(err).
Once().
After(10 * time.Millisecond)
})).Return(err).Once().After(10 * time.Millisecond)
}

uuid.SetRand(bytes.NewReader(make([]byte, 512)))
Expand Down Expand Up @@ -856,3 +854,226 @@ func TestListener_Stream(t *testing.T) {
})
}
}

func TestListener_Process(t *testing.T) {
ctx := context.Background()
monitor := new(monitorMock)
parser := new(parserMock)
repo := new(repositoryMock)
repl := new(replicatorMock)
pub := new(publisherMock)

setCreatePublication := func(name string, err error) {
repo.On("CreatePublication", name).Return(err).Once()
}

setGetSlotLSN := func(slotName string, lsn string, err error) {
repo.On("GetSlotLSN", slotName).Return(lsn, err).Once()
}

setStartReplication := func(
err error,
slotName string,
startLsn uint64,
timeline int64,
pluginArguments ...string) {
repl.On("StartReplication", slotName, startLsn, timeline, pluginArguments).Return(err).Once()
}

setIsAlive := func(res bool) {
repl.On("IsAlive").Return(res)
}

setClose := func(err error) {
repl.On("Close").Return(err).Maybe()
}

setRepoClose := func(err error) {
repo.On("Close").Return(err)
}

setRepoIsAlive := func(res bool) {
repo.On("IsAlive").Return(res)
}

setWaitForReplicationMessage := func(mess *pgx.ReplicationMessage, err error) {
repl.On("WaitForReplicationMessage", mock.Anything).Return(mess, err)
}

setSendStandbyStatus := func(err error) {
repl.On("SendStandbyStatus", mock.Anything).Return(err)
}

setCreateReplicationSlotEx := func(slotName, outputPlugin, consistentPoint, snapshotName string, err error) {
repl.On("CreateReplicationSlotEx", slotName, outputPlugin).Return(consistentPoint, snapshotName, err)
}

tests := []struct {
name string
cfg *config.Config
setup func()
wantErr error
}{
{
name: "success",
cfg: &config.Config{
Listener: &config.ListenerCfg{
SlotName: "slot1",
AckTimeout: 0,
RefreshConnection: 1,
HeartbeatInterval: 2,
Filter: config.FilterStruct{
Tables: nil,
},
TopicsMap: nil,
},
},
setup: func() {
ctx, _ = context.WithTimeout(ctx, time.Millisecond*200)
setCreatePublication("wal-listener", nil)
setGetSlotLSN("slot1", "100/200", nil)
setStartReplication(
nil,
"slot1",
1099511628288,
-1,
"proto_version '1'",
"publication_names 'wal-listener'",
)
setIsAlive(true)
setRepoIsAlive(true)
setWaitForReplicationMessage(nil, nil)
setSendStandbyStatus(nil)
setClose(nil)
setRepoClose(nil)
},
wantErr: nil,
},
{
name: "skip create publication",
cfg: &config.Config{
Listener: &config.ListenerCfg{
SlotName: "slot1",
AckTimeout: 0,
RefreshConnection: 1,
HeartbeatInterval: 2,
Filter: config.FilterStruct{
Tables: nil,
},
TopicsMap: nil,
},
},
setup: func() {
ctx, _ = context.WithTimeout(ctx, time.Millisecond*20)
setCreatePublication("wal-listener", errors.New("some err"))
setGetSlotLSN("slot1", "100/200", nil)
setStartReplication(
nil,
"slot1",
1099511628288,
-1,
"proto_version '1'",
"publication_names 'wal-listener'",
)
setIsAlive(true)
setRepoIsAlive(true)
setWaitForReplicationMessage(nil, nil)
setSendStandbyStatus(nil)
setClose(nil)
setRepoClose(nil)
},
wantErr: nil,
},
{
name: "get slot error",
cfg: &config.Config{
Listener: &config.ListenerCfg{
SlotName: "slot1",
AckTimeout: 0,
RefreshConnection: 1,
HeartbeatInterval: 2,
Filter: config.FilterStruct{
Tables: nil,
},
TopicsMap: nil,
},
},
setup: func() {
ctx, _ = context.WithTimeout(ctx, time.Millisecond*20)
setCreatePublication("wal-listener", nil)
setGetSlotLSN("slot1", "100/200", errors.New("some err"))
},
wantErr: errors.New("slot is exists: get slot lsn: some err"),
},
{
name: "slot does not exists",
cfg: &config.Config{
Listener: &config.ListenerCfg{
SlotName: "slot1",
AckTimeout: 0,
RefreshConnection: 1,
HeartbeatInterval: 2,
Filter: config.FilterStruct{
Tables: nil,
},
TopicsMap: nil,
},
},
setup: func() {
ctx, _ = context.WithTimeout(ctx, time.Millisecond*20)
setCreatePublication("wal-listener", nil)
setGetSlotLSN("slot1", "", nil)
setCreateReplicationSlotEx(
"slot1",
"pgoutput",
"100/200",
"",
nil,
)
setStartReplication(
nil,
"slot1",
1099511628288,
-1,
"proto_version '1'",
"publication_names 'wal-listener'",
)
setIsAlive(true)
setRepoIsAlive(true)
setWaitForReplicationMessage(nil, nil)
setSendStandbyStatus(nil)
setClose(nil)
setRepoClose(nil)
},
wantErr: nil,
},
}

logger := slog.New(slog.NewJSONHandler(io.Discard, nil))

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer repo.AssertExpectations(t)
defer repl.AssertExpectations(t)

tt.setup()

l := NewWalListener(
tt.cfg,
logger,
repo,
repl,
pub,
parser,
monitor,
)

err := l.Process(ctx)
if err != nil && assert.Error(t, tt.wantErr, err.Error()) {
assert.EqualError(t, err, tt.wantErr.Error())
} else {
assert.NoError(t, tt.wantErr)
}
})
}
}
2 changes: 2 additions & 0 deletions listener/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error {
default:
return fmt.Errorf("%w : %s", errUnknownMessageType, []byte{p.msgType})
}

return nil
}

Expand Down Expand Up @@ -192,6 +193,7 @@ func (p *BinaryParser) getUpdateMsg() Update {
u.RelationID = p.readInt32()
u.KeyTuple = p.charIsExists('K')
u.OldTuple = p.charIsExists('O')

if u.KeyTuple || u.OldTuple {
u.OldRow = p.readTupleData()
}
Expand Down
File renamed without changes.
Loading

0 comments on commit d618dbd

Please sign in to comment.