diff --git a/baseapp/abci.go b/baseapp/abci.go index 1b357bc67577..cbc0d8885b53 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -711,6 +711,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r // must be used. func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) { var events []abci.Event + var publishEvents sdk.PublishEvents if err := app.checkHalt(req.Height, req.Time); err != nil { return nil, err @@ -803,6 +804,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // continue } + publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...) events = append(events, beginBlock.Events...) // Reset the gas meter so that the AnteHandlers aren't required to @@ -814,7 +816,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // // NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g. // vote extensions, so skip those. - txResults, err := app.executeTxs(ctx, req.Txs) + txResults, txEventSet, err := app.executeTxs(ctx, req.Txs) if err != nil { return nil, err } @@ -823,6 +825,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) } + app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager())) + endBlock, err := app.endBlock(app.finalizeBlockState.Context()) if err != nil { return nil, err @@ -836,9 +840,23 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // continue } + publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...) + events = append(events, endBlock.Events...) cp := app.GetConsensusParams(app.finalizeBlockState.Context()) + events, trueOrder := filterOutPublishEvents(events) + app.flushData = PublishEventFlush{ + Height: header.Height, + PrevAppHash: header.AppHash, + BlockEvents: EventSet{ + AbciEvents: events, + PublishEvents: publishEvents, + TrueOrder: trueOrder, + }, + TxEvents: txEventSet, + } + return &abci.FinalizeBlockResponse{ Events: events, TxResults: txResults, @@ -847,11 +865,15 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz }, nil } -func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) { +func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, []EventSet, error) { txResults := make([]*abci.ExecTxResult, 0, len(txs)) + txEventSet := make([]EventSet, 0) + for txIdx, rawTx := range txs { var response *abci.ExecTxResult + app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager())) + if memTx, err := app.txDecoder(rawTx); err == nil { response = app.deliverTx(rawTx, memTx, txIdx) } else { @@ -870,14 +892,36 @@ func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecT // check after every tx if we should abort select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, nil, ctx.Err() default: // continue } - + filtered, order := filterOutPublishEvents(response.Events) + response.Events = filtered txResults = append(txResults, response) + txEventSet = append(txEventSet, EventSet{ + AbciEvents: response.Events, + PublishEvents: app.finalizeBlockState.Context().PublishEventManager().Events(), + TrueOrder: order, + }) } - return txResults, nil + return txResults, txEventSet, nil +} + +func filterOutPublishEvents(events []abci.Event) ([]abci.Event, []EventType) { + var filteredEvents []abci.Event + var trueOrder []EventType + + for _, e := range events { + if e.Type == sdk.PlaceholderEventType { + trueOrder = append(trueOrder, EventTypePublish) + continue + } + filteredEvents = append(filteredEvents, e) + trueOrder = append(trueOrder, EventTypeAbci) + } + + return filteredEvents, trueOrder } // FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock. @@ -967,7 +1011,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) { rms.SetCommitHeader(header) } - app.cms.Commit() + commitId := app.cms.Commit() + + app.flushData.NewAppHash = commitId.Hash + app.PublishBlockEvents(app.flushData) resp := &abci.CommitResponse{ RetainHeight: retainHeight, diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index c43eb4dd9d26..d093ac9a0f86 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -564,7 +564,7 @@ func TestABCI_CheckTx(t *testing.T) { anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, counterKey)) } suite := NewBaseAppSuite(t, anteOpt) - baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey}) + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey, false}) nTxs := int64(5) _, err := suite.baseApp.InitChain(&abci.InitChainRequest{ @@ -618,7 +618,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) { require.NoError(t, err) deliverKey := []byte("deliver-key") - baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey}) + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false}) nBlocks := 3 txPerHeight := 5 @@ -668,10 +668,10 @@ func TestABCI_FinalizeBlock_MultiMsg(t *testing.T) { require.NoError(t, err) deliverKey := []byte("deliver-key") - baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey}) + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false}) deliverKey2 := []byte("deliver-key2") - baseapptestutil.RegisterCounter2Server(suite.baseApp.MsgServiceRouter(), Counter2ServerImpl{t, capKey1, deliverKey2}) + baseapptestutil.RegisterCounter2Server(suite.baseApp.MsgServiceRouter(), Counter2ServerImpl{t, capKey1, deliverKey2, false}) // run a multi-msg tx // with all msgs the same route diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index c64cf3447b7b..23c582013fdc 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -205,6 +205,10 @@ type BaseApp struct { EnableStreamer bool StreamEvents chan StreamEvents + EnablePublish bool + PublishEvents chan PublishEventFlush + flushData PublishEventFlush + traceFlightRecorder *metrics.TraceRecorder } @@ -227,6 +231,7 @@ func NewBaseApp( sigverifyTx: true, queryGasLimit: math.MaxUint64, StreamEvents: make(chan StreamEvents), + PublishEvents: make(chan PublishEventFlush), } for _, option := range options { diff --git a/baseapp/baseapp_test.go b/baseapp/baseapp_test.go index 8e228ae379db..cebf8498831d 100644 --- a/baseapp/baseapp_test.go +++ b/baseapp/baseapp_test.go @@ -219,7 +219,7 @@ func TestAnteHandlerGasMeter(t *testing.T) { require.NoError(t, err) deliverKey := []byte("deliver-key") - baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey}) + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false}) tx := newTxCounter(t, suite.txConfig, 0, 0) txBytes, err := suite.txConfig.TxEncoder()(tx) @@ -562,7 +562,7 @@ func TestBaseAppAnteHandler(t *testing.T) { suite := NewBaseAppSuite(t, anteOpt) deliverKey := []byte("deliver-key") - baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey}) + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false}) _, err := suite.baseApp.InitChain(&abci.InitChainRequest{ ConsensusParams: &cmtproto.ConsensusParams{}, @@ -636,7 +636,7 @@ func TestBaseAppPostHandler(t *testing.T) { } suite := NewBaseAppSuite(t, anteOpt) - baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo")}) + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo"), false}) _, err := suite.baseApp.InitChain(&abci.InitChainRequest{ ConsensusParams: &cmtproto.ConsensusParams{}, diff --git a/baseapp/publish_event.go b/baseapp/publish_event.go new file mode 100644 index 000000000000..0bde3d005e78 --- /dev/null +++ b/baseapp/publish_event.go @@ -0,0 +1,33 @@ +package baseapp + +import ( + abci "github.com/cometbft/cometbft/abci/types" + types "github.com/cosmos/cosmos-sdk/types" +) + +type EventType byte + +const ( + EventTypeAbci EventType = iota + EventTypePublish +) + +type EventSet struct { + AbciEvents []abci.Event + PublishEvents types.PublishEvents + TrueOrder []EventType +} + +type PublishEventFlush struct { + Height int64 + PrevAppHash []byte + NewAppHash []byte + BlockEvents EventSet + TxEvents []EventSet +} + +func (app *BaseApp) PublishBlockEvents(flush PublishEventFlush) { + if app.EnablePublish { + app.PublishEvents <- flush + } +} diff --git a/baseapp/publish_event_test.go b/baseapp/publish_event_test.go new file mode 100644 index 000000000000..699962f045df --- /dev/null +++ b/baseapp/publish_event_test.go @@ -0,0 +1,204 @@ +package baseapp_test + +import ( + "fmt" + "testing" + + "cosmossdk.io/log" + abci "github.com/cometbft/cometbft/abci/types" + cmtproto "github.com/cometbft/cometbft/api/cometbft/types/v1" + dbm "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/require" + + "github.com/cosmos/cosmos-sdk/baseapp" + baseapptestutil "github.com/cosmos/cosmos-sdk/baseapp/testutil" + "github.com/cosmos/cosmos-sdk/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ types.PublishEvent = (*StringPublishEvent)(nil) + +type StringPublishEvent struct { + data string +} + +func (e StringPublishEvent) ToString() string { + return e.data +} + +func (e StringPublishEvent) Serialize() []byte { + return []byte(e.data) +} + +func getStreamEventFlushChan(app *baseapp.BaseApp) chan baseapp.PublishEventFlush { + app.EnablePublish = true + publishEventChan := make(chan baseapp.PublishEventFlush) + + go func() { + for { + select { + case e := <-app.PublishEvents: + publishEventChan <- e + } + } + }() + + return publishEventChan +} + +func TestPublishEvent_FinalizeBlock_WithBeginAndEndBlocker(t *testing.T) { + name := t.Name() + db := dbm.NewMemDB() + app := baseapp.NewBaseApp(name, log.NewTestLogger(t), db, nil) + + publishEventChan := getStreamEventFlushChan(app) + + app.SetBeginBlocker(func(ctx sdk.Context) (sdk.BeginBlock, error) { + ctx = ctx.WithEventManager(sdk.NewEventManager()) + ctx.EventManager().EmitEvent(types.Event{ + Type: "sometype", + Attributes: []abci.EventAttribute{ + { + Key: "foo", + Value: "bar", + }, + }, + }) + ctx.PublishEventManager().EmitEvent(StringPublishEvent{"sometype2"}) + + return sdk.BeginBlock{ + Events: ctx.EventManager().ABCIEvents(), + }, nil + }) + + app.SetEndBlocker(func(ctx sdk.Context) (sdk.EndBlock, error) { + ctx = ctx.WithEventManager(sdk.NewEventManager()) + ctx.EventManager().EmitEvent(types.Event{ + Type: "anothertype", + Attributes: []abci.EventAttribute{ + { + Key: "foo", + Value: "bar", + }, + }, + }) + + ctx.PublishEventManager().EmitEvent(StringPublishEvent{"anothertype2"}) + + return sdk.EndBlock{ + Events: ctx.EventManager().ABCIEvents(), + }, nil + }) + + _, err := app.InitChain( + &abci.InitChainRequest{ + InitialHeight: 1, + }, + ) + require.NoError(t, err) + + res, err := app.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1}) + require.NoError(t, err) + + require.Len(t, res.Events, 2) + + require.Equal(t, "sometype", res.Events[0].Type) + require.Equal(t, "foo", res.Events[0].Attributes[0].Key) + require.Equal(t, "bar", res.Events[0].Attributes[0].Value) + require.Equal(t, "mode", res.Events[0].Attributes[1].Key) + require.Equal(t, "BeginBlock", res.Events[0].Attributes[1].Value) + + require.Equal(t, "anothertype", res.Events[1].Type) + require.Equal(t, "foo", res.Events[1].Attributes[0].Key) + require.Equal(t, "bar", res.Events[1].Attributes[0].Value) + require.Equal(t, "mode", res.Events[1].Attributes[1].Key) + require.Equal(t, "EndBlock", res.Events[1].Attributes[1].Value) + + _, err = app.Commit() + require.NoError(t, err) + + require.Equal(t, int64(1), app.LastBlockHeight()) + + pevts := <-publishEventChan + require.Len(t, pevts.BlockEvents.PublishEvents, 2) + require.Equal(t, StringPublishEvent{"sometype2"}, pevts.BlockEvents.PublishEvents[0]) + require.Equal(t, StringPublishEvent{"anothertype2"}, pevts.BlockEvents.PublishEvents[1]) + + require.Len(t, pevts.BlockEvents.AbciEvents, 2) + + require.Len(t, pevts.BlockEvents.TrueOrder, 4) + require.Equal(t, pevts.BlockEvents.TrueOrder, []baseapp.EventType{ + baseapp.EventTypeAbci, baseapp.EventTypePublish, baseapp.EventTypeAbci, baseapp.EventTypePublish, + }) + +} + +func TestPublishEvent_FinalizeBlock_DeliverTx(t *testing.T) { + anteKey := []byte("ante-key") + anteOpt := func(bapp *baseapp.BaseApp) { + bapp.SetAnteHandler(anteHandlerTxTestWithCustomEventEmit(t, capKey1, anteKey, true)) + } + suite := NewBaseAppSuite(t, anteOpt) + publishEventChan := getStreamEventFlushChan(suite.baseApp) + + _, err := suite.baseApp.InitChain(&abci.InitChainRequest{ + ConsensusParams: &cmtproto.ConsensusParams{}, + }) + require.NoError(t, err) + + deliverKey := []byte("deliver-key") + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, true}) + + nBlocks := 3 + txPerHeight := 5 + + var lastAppHash []byte + + for blockN := 0; blockN < nBlocks; blockN++ { + + txs := [][]byte{} + for i := 0; i < txPerHeight; i++ { + counter := int64(blockN*txPerHeight + i) + tx := newTxCounter(t, suite.txConfig, counter, counter) + + txBytes, err := suite.txConfig.TxEncoder()(tx) + require.NoError(t, err) + + txs = append(txs, txBytes) + } + + res, err := suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{ + Height: int64(blockN) + 1, + Txs: txs, + }) + require.NoError(t, err) + + for i := 0; i < txPerHeight; i++ { + counter := int64(blockN*txPerHeight + i) + require.True(t, res.TxResults[i].IsOK(), fmt.Sprintf("%v", res)) + + events := res.TxResults[i].GetEvents() + require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively") + require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0], events[0], "ante handler event") + require.Equal(t, sdk.MarkEventsToIndex(counterEvent(sdk.EventTypeMessage, counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[2].Attributes[0], "msg handler update counter event") + } + + _, err = suite.baseApp.Commit() + require.NoError(t, err) + + pevts := <-publishEventChan + require.Equal(t, pevts.Height, int64(blockN+1)) + if blockN > 0 { + require.Equal(t, pevts.PrevAppHash, lastAppHash, "should be the same as last app hash") + } + require.Equal(t, pevts.NewAppHash, res.AppHash) + require.Len(t, pevts.TxEvents, txPerHeight) + fmt.Println(pevts.TxEvents) + for i := 0; i < txPerHeight; i++ { + require.Len(t, pevts.TxEvents[i].PublishEvents, 2) + require.Len(t, pevts.TxEvents[i].TrueOrder, 5) + } + + lastAppHash = res.AppHash + } +} \ No newline at end of file diff --git a/baseapp/streaming_test.go b/baseapp/streaming_test.go index 74f786e25e3a..8837d8bda7a7 100644 --- a/baseapp/streaming_test.go +++ b/baseapp/streaming_test.go @@ -58,7 +58,7 @@ func TestABCI_MultiListener_StateChanges(t *testing.T) { ) deliverKey := []byte("deliver-key") - baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey}) + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false}) nBlocks := 3 txPerHeight := 5 diff --git a/baseapp/utils_test.go b/baseapp/utils_test.go index e2aea7296eea..d32632ec3f39 100644 --- a/baseapp/utils_test.go +++ b/baseapp/utils_test.go @@ -116,6 +116,32 @@ func (m MsgKeyValueImpl) Set(ctx context.Context, msg *baseapptestutil.MsgKeyVal return &baseapptestutil.MsgCreateKeyValueResponse{}, nil } +var _ sdk.PublishEvent = (*TestPublishEvent)(nil) + +type TestPublishEvent struct { + Type string `json:"type,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` +} + +func NewTestPublishEvent(typ string, attributes map[string]string) *TestPublishEvent { + return &TestPublishEvent{ + Type: typ, + Attributes: attributes, + } +} + +func (b *TestPublishEvent) Serialize() []byte { + bz, err := json.Marshal(b) + if err != nil { + panic(err) + } + return bz +} + +func (b *TestPublishEvent) ToString() string { + return fmt.Sprintf("%s: %s", b.Type, b.Attributes) +} + type CounterServerImplGasMeterOnly struct { gas uint64 } @@ -146,20 +172,24 @@ type CounterServerImpl struct { t *testing.T capKey storetypes.StoreKey deliverKey []byte + + emitCustomEvent bool } func (m CounterServerImpl) IncrementCounter(ctx context.Context, msg *baseapptestutil.MsgCounter) (*baseapptestutil.MsgCreateCounterResponse, error) { - return incrementCounter(ctx, m.t, m.capKey, m.deliverKey, msg) + return incrementCounter(ctx, m.t, m.capKey, m.deliverKey, msg, m.emitCustomEvent) } type Counter2ServerImpl struct { t *testing.T capKey storetypes.StoreKey deliverKey []byte + + emitCustomEvent bool } func (m Counter2ServerImpl) IncrementCounter(ctx context.Context, msg *baseapptestutil.MsgCounter2) (*baseapptestutil.MsgCreateCounterResponse, error) { - return incrementCounter(ctx, m.t, m.capKey, m.deliverKey, msg) + return incrementCounter(ctx, m.t, m.capKey, m.deliverKey, msg, m.emitCustomEvent) } func incrementCounter(ctx context.Context, @@ -167,6 +197,7 @@ func incrementCounter(ctx context.Context, capKey storetypes.StoreKey, deliverKey []byte, msg sdk.Msg, + emitCustomEvent bool, ) (*baseapptestutil.MsgCreateCounterResponse, error) { sdkCtx := sdk.UnwrapSDKContext(ctx) store := sdkCtx.KVStore(capKey) @@ -192,6 +223,12 @@ func incrementCounter(ctx context.Context, counterEvent(sdk.EventTypeMessage, msgCount), ) + if emitCustomEvent { + sdkCtx.PublishEventManager().EmitEvent( + NewTestPublishEvent("message", map[string]string{"counter": fmt.Sprintf("%d", msgCount)}), + ) + } + _, err := incrementingCounter(t, store, deliverKey, msgCount) if err != nil { return nil, err @@ -210,6 +247,10 @@ func counterEvent(evType string, msgCount int64) sdk.Events { } func anteHandlerTxTest(t *testing.T, capKey storetypes.StoreKey, storeKey []byte) sdk.AnteHandler { + return anteHandlerTxTestWithCustomEventEmit(t, capKey, storeKey, false) +} + +func anteHandlerTxTestWithCustomEventEmit(t *testing.T, capKey storetypes.StoreKey, storeKey []byte, emitCustomEvent bool) sdk.AnteHandler { return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) { store := ctx.KVStore(capKey) counter, failOnAnte := parseTxMemo(t, tx) @@ -227,6 +268,12 @@ func anteHandlerTxTest(t *testing.T, capKey storetypes.StoreKey, storeKey []byte counterEvent("ante_handler", counter), ) + if emitCustomEvent { + ctx.PublishEventManager().EmitEvent( + NewTestPublishEvent("ante_handler", map[string]string{"counter": fmt.Sprintf("%d", counter)}), + ) + } + ctx = ctx.WithPriority(testTxPriority) return ctx, nil } diff --git a/server/config/config.go b/server/config/config.go index f046a9e43fec..4fd9c9207e72 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -173,6 +173,7 @@ type ( // StreamingConfig defines application configuration for external streaming services StreamingConfig struct { ABCI ABCIListenerConfig `mapstructure:"abci"` + MQPub MQPubConfig `mapstructure:"mqpub"` } // ABCIListenerConfig defines application configuration for ABCIListener streaming service ABCIListenerConfig struct { @@ -180,6 +181,14 @@ type ( Plugin string `mapstructure:"plugin"` StopNodeOnErr bool `mapstructure:"stop-node-on-err"` } + + // MQPubConfig defines application configuration for MessageQueue publish service + MQPubConfig struct { + Enabled bool `mapstructure:"enabled"` + SeedBrokers []string `mapstructure:"seed-brokers"` + TopicName string `mapstructure:"topic-name"` + ControlPort int `mapstructure:"control-port"` + } ) // Config defines the server's top level configuration @@ -261,6 +270,12 @@ func DefaultConfig() *Config { Keys: []string{}, StopNodeOnErr: true, }, + MQPub: MQPubConfig{ + Enabled: false, + SeedBrokers: []string{}, + TopicName: "", + ControlPort: 0, + }, }, Mempool: MempoolConfig{ MaxTxs: -1, diff --git a/server/config/config_test.go b/server/config/config_test.go index bd5cf061b350..9223f49a19d3 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -52,6 +52,12 @@ func TestStreamingConfig(t *testing.T) { Plugin: "plugin-A", StopNodeOnErr: false, }, + MQPub: MQPubConfig{ + Enabled: false, + SeedBrokers: []string{}, + TopicName: "", + ControlPort: 0, + }, }, } @@ -68,8 +74,11 @@ func TestStreamingConfig(t *testing.T) { `keys = ["one", "two", ]`, `plugin = "plugin-A"`, `stop-node-on-err = false`, + `enabled = false`, + `seed-brokers = []`, + `topic-name = ""`, + `control-port = 0`, } - for _, line := range expectedLines { assert.Contains(t, cfgFileContents, line+"\n", "config file contents") } diff --git a/server/config/toml.go b/server/config/toml.go index 94d59d357aa4..ac6f8f0cafe1 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -234,6 +234,22 @@ plugin = "{{ .Streaming.ABCI.Plugin }}" # stop-node-on-err specifies whether to stop the node on message delivery error. stop-node-on-err = {{ .Streaming.ABCI.StopNodeOnErr }} +# streaming.mqpub specifies the configuration for the MessageQueue publish service. +[streaming.mqpub] +# enable defines if the message queue publish service should be enabled. +enabled = {{ .Streaming.MQPub.Enabled }} +# seed-brokers defines where to be connected when the message gets published. +{{- if .Streaming.MQPub.SeedBrokers }} +seed-brokers = [{{- range $i, $e := .Streaming.MQPub.SeedBrokers }}{{ if $i }}, {{ end }}"{{ $e }}"{{- end }}] +{{- else }} +seed-brokers = [] +{{- end }} +# topic-name defines the to be connected when message gets published. +topic-name = "{{ .Streaming.MQPub.TopicName }}" +# control-port defines the server port for mq control +control-port = {{ .Streaming.MQPub.ControlPort }} + + ############################################################################### ### Mempool ### ############################################################################### diff --git a/types/context.go b/types/context.go index cacec242c1ce..cd4de869eab6 100644 --- a/types/context.go +++ b/types/context.go @@ -59,6 +59,7 @@ type Context struct { minGasPrice DecCoins consParams cmtproto.ConsensusParams eventManager EventManagerI + publishEventManager PublishEventManagerI priority int64 // The tx priority, only relevant in CheckTx kvGasConfig storetypes.GasConfig transientKVGasConfig storetypes.GasConfig @@ -92,6 +93,9 @@ func (c Context) IsSigverifyTx() bool { return c.sigve func (c Context) ExecMode() ExecMode { return c.execMode } func (c Context) MinGasPrices() DecCoins { return c.minGasPrice } func (c Context) EventManager() EventManagerI { return c.eventManager } +func (c Context) PublishEventManager() PublishEventManagerI { + return &EventPlaceholderManager{c.eventManager, c.publishEventManager} +} func (c Context) Priority() int64 { return c.priority } func (c Context) KVGasConfig() storetypes.GasConfig { return c.kvGasConfig } func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.transientKVGasConfig } @@ -145,6 +149,7 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool gasMeter: storetypes.NewInfiniteGasMeter(), minGasPrice: DecCoins{}, eventManager: NewEventManager(), + publishEventManager: NewPublishEventManager(), kvGasConfig: storetypes.KVGasConfig(), transientKVGasConfig: storetypes.TransientGasConfig(), txIndex: -1, @@ -302,6 +307,12 @@ func (c Context) WithEventManager(em EventManagerI) Context { return c } +// WithPublishEventManager returns a Context with an updated publish event manager +func (c Context) WithPublishEventManager(sem PublishEventManagerI) Context { + c.publishEventManager = sem + return c +} + // WithPriority returns a Context with an updated tx priority func (c Context) WithPriority(p int64) Context { c.priority = p @@ -381,10 +392,19 @@ func (c Context) ObjectStore(key storetypes.StoreKey) storetypes.ObjKVStore { // EventManager when the caller executes the write. func (c Context) CacheContext() (cc Context, writeCache func()) { cms := c.ms.CacheMultiStore() - cc = c.WithMultiStore(cms).WithEventManager(NewEventManager()) + cc = c.WithMultiStore(cms).WithEventManager(NewEventManager()).WithPublishEventManager(NewPublishEventManager()) writeCache = func() { c.EventManager().EmitEvents(cc.EventManager().Events()) + + pem := c.PublishEventManager() + // EventPlaceholderManager already emitted event placeholders to the EventManager + // so we do not emit them again by unwrapping it. + if pem.(*EventPlaceholderManager) != nil { + pem = pem.(*EventPlaceholderManager).publishEventManager + } + pem.EmitEvents(cc.PublishEventManager().Events()) + cms.Write() } diff --git a/types/events_placeholder.go b/types/events_placeholder.go new file mode 100644 index 000000000000..f8549b841aa8 --- /dev/null +++ b/types/events_placeholder.go @@ -0,0 +1,30 @@ +package types + +const PlaceholderEventType = "publish event placeholder" + +var _ PublishEventManagerI = (*EventPlaceholderManager)(nil) + +type EventPlaceholderManager struct { + eventManager EventManagerI + publishEventManager PublishEventManagerI +} + +func (e *EventPlaceholderManager) Events() PublishEvents { + return e.publishEventManager.Events() +} + +func (e *EventPlaceholderManager) EmitEvent(event PublishEvent) { + e.publishEventManager.EmitEvent(event) + placeholder := NewEvent(PlaceholderEventType) + e.eventManager.EmitEvent(placeholder) +} + +func (e *EventPlaceholderManager) EmitEvents(events PublishEvents) { + e.publishEventManager.EmitEvents(events) + placeholders := make(Events, 0, len(events)) + for _, _ = range events { + placeholder := NewEvent(PlaceholderEventType) + placeholders = append(placeholders, placeholder) + } + e.eventManager.EmitEvents(placeholders) +} diff --git a/types/publish_events.go b/types/publish_events.go new file mode 100644 index 000000000000..663c48dd84dc --- /dev/null +++ b/types/publish_events.go @@ -0,0 +1,40 @@ +package types + +type PublishEventManagerI interface { + Events() PublishEvents + EmitEvent(event PublishEvent) + EmitEvents(events PublishEvents) +} + +var _ PublishEventManagerI = (*PublishEventManager)(nil) + +// PublishEventManager implements a simple wrapper around a slice of PublishEvent objects that +// can be emitted from. +type PublishEventManager struct { + events PublishEvents +} + +func NewPublishEventManager() *PublishEventManager { + return &PublishEventManager{EmptyPublishEvents()} +} + +func (em *PublishEventManager) Events() PublishEvents { return em.events } + +func (em *PublishEventManager) EmitEvent(event PublishEvent) { + em.events = append(em.events, event) +} + +func (em *PublishEventManager) EmitEvents(events PublishEvents) { + em.events = append(em.events, events...) +} + +type PublishEvent interface { + Serialize() []byte +} + +type PublishEvents []PublishEvent + +// EmptyPublishEvents returns an empty slice of events. +func EmptyPublishEvents() PublishEvents { + return make(PublishEvents, 0) +} \ No newline at end of file