Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
var events []abci.Event
var publishEvents sdk.PublishEvents

if err := app.checkHalt(req.Height, req.Time); err != nil {
return nil, err
Comment on lines 711 to 717

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change potentially affects state.

Call sequence:

(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).internalFinalizeBlock (baseapp/abci.go:712)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).FinalizeBlock (baseapp/abci.go:893)

Expand Down Expand Up @@ -803,6 +804,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// continue
}

publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...)
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
Expand All @@ -814,7 +816,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults, err := app.executeTxs(ctx, req.Txs)
txResults, txEventSet, err := app.executeTxs(ctx, req.Txs)
if err != nil {
return nil, err
}
Expand All @@ -823,6 +825,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager()))

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
if err != nil {
return nil, err
Expand All @@ -836,9 +840,23 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// continue
}

publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...)

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())

events, trueOrder := filterOutPublishEvents(events)
app.flushData = PublishEventFlush{
Height: header.Height,
PrevAppHash: header.AppHash,
BlockEvents: EventSet{
AbciEvents: events,
PublishEvents: publishEvents,
TrueOrder: trueOrder,
},
TxEvents: txEventSet,
}

return &abci.FinalizeBlockResponse{
Events: events,
TxResults: txResults,
Expand All @@ -847,11 +865,15 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, []EventSet, error) {
txResults := make([]*abci.ExecTxResult, 0, len(txs))
txEventSet := make([]EventSet, 0)

for txIdx, rawTx := range txs {
var response *abci.ExecTxResult

app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager()))

if memTx, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, memTx, txIdx)
} else {
Expand All @@ -870,14 +892,36 @@ func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecT
// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, nil, ctx.Err()
default:
// continue
}

filtered, order := filterOutPublishEvents(response.Events)
response.Events = filtered
txResults = append(txResults, response)
txEventSet = append(txEventSet, EventSet{
AbciEvents: response.Events,
PublishEvents: app.finalizeBlockState.Context().PublishEventManager().Events(),
TrueOrder: order,
})
}
return txResults, nil
return txResults, txEventSet, nil
}

func filterOutPublishEvents(events []abci.Event) ([]abci.Event, []EventType) {
var filteredEvents []abci.Event
var trueOrder []EventType

for _, e := range events {
if e.Type == sdk.PlaceholderEventType {
trueOrder = append(trueOrder, EventTypePublish)
continue
}
filteredEvents = append(filteredEvents, e)
trueOrder = append(trueOrder, EventTypeAbci)
}

return filteredEvents, trueOrder
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
Expand Down Expand Up @@ -967,7 +1011,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
rms.SetCommitHeader(header)
}

app.cms.Commit()
commitId := app.cms.Commit()

app.flushData.NewAppHash = commitId.Hash
app.PublishBlockEvents(app.flushData)

resp := &abci.CommitResponse{
RetainHeight: retainHeight,
Expand Down
8 changes: 4 additions & 4 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestABCI_CheckTx(t *testing.T) {
anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, counterKey)) }
suite := NewBaseAppSuite(t, anteOpt)

baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey, false})

nTxs := int64(5)
_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) {
require.NoError(t, err)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

nBlocks := 3
txPerHeight := 5
Expand Down Expand Up @@ -668,10 +668,10 @@ func TestABCI_FinalizeBlock_MultiMsg(t *testing.T) {
require.NoError(t, err)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

deliverKey2 := []byte("deliver-key2")
baseapptestutil.RegisterCounter2Server(suite.baseApp.MsgServiceRouter(), Counter2ServerImpl{t, capKey1, deliverKey2})
baseapptestutil.RegisterCounter2Server(suite.baseApp.MsgServiceRouter(), Counter2ServerImpl{t, capKey1, deliverKey2, false})

// run a multi-msg tx
// with all msgs the same route
Expand Down
5 changes: 5 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ type BaseApp struct {
EnableStreamer bool
StreamEvents chan StreamEvents

EnablePublish bool
PublishEvents chan PublishEventFlush
flushData PublishEventFlush

traceFlightRecorder *metrics.TraceRecorder
}

Expand All @@ -227,6 +231,7 @@ func NewBaseApp(
sigverifyTx: true,
queryGasLimit: math.MaxUint64,
StreamEvents: make(chan StreamEvents),
PublishEvents: make(chan PublishEventFlush),
}

for _, option := range options {
Expand Down
6 changes: 3 additions & 3 deletions baseapp/baseapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestAnteHandlerGasMeter(t *testing.T) {
require.NoError(t, err)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

tx := newTxCounter(t, suite.txConfig, 0, 0)
txBytes, err := suite.txConfig.TxEncoder()(tx)
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestBaseAppAnteHandler(t *testing.T) {
suite := NewBaseAppSuite(t, anteOpt)

deliverKey := []byte("deliver-key")
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})

_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
ConsensusParams: &cmtproto.ConsensusParams{},
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestBaseAppPostHandler(t *testing.T) {
}

suite := NewBaseAppSuite(t, anteOpt)
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo")})
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo"), false})

_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
ConsensusParams: &cmtproto.ConsensusParams{},
Expand Down
33 changes: 33 additions & 0 deletions baseapp/publish_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package baseapp

import (
abci "github.com/cometbft/cometbft/abci/types"
types "github.com/cosmos/cosmos-sdk/types"
)

type EventType byte

const (
EventTypeAbci EventType = iota
EventTypePublish
)

type EventSet struct {
AbciEvents []abci.Event
PublishEvents types.PublishEvents
TrueOrder []EventType
}

type PublishEventFlush struct {
Height int64
PrevAppHash []byte
NewAppHash []byte
BlockEvents EventSet
TxEvents []EventSet
}

func (app *BaseApp) PublishBlockEvents(flush PublishEventFlush) {
if app.EnablePublish {
app.PublishEvents <- flush
}
}
Loading
Loading