diff --git a/cmd/rpc/routes.go b/cmd/rpc/routes.go index 0e412525..27a412d3 100644 --- a/cmd/rpc/routes.go +++ b/cmd/rpc/routes.go @@ -62,6 +62,7 @@ const ( ValidatorSetRoutePath = "/v1/query/validator-set" CheckpointRoutePath = "/v1/query/checkpoint" SubscribeRCInfoPath = "/v1/subscribe-rc-info" + SubscribeBlockDataPath = "/v1/subscribe-block-data" // debug DebugBlockedRoutePath = "/debug/blocked" DebugHeapRoutePath = "/debug/heap" @@ -201,6 +202,7 @@ const ( AddVoteRouteName = "add-vote" DelVoteRouteName = "del-vote" SubscribeRCInfoName = "subscribe-rc-info" + SubscribeBlockDataName = "subscribe-block-data" ) // routes contains the method and path for a canopy command @@ -306,6 +308,7 @@ var routePaths = routes{ AddVoteRouteName: {Method: http.MethodPost, Path: AddVoteRoutePath}, DelVoteRouteName: {Method: http.MethodPost, Path: DelVoteRoutePath}, SubscribeRCInfoName: {Method: http.MethodGet, Path: SubscribeRCInfoPath}, + SubscribeBlockDataName: {Method: http.MethodGet, Path: SubscribeBlockDataPath}, } // httpRouteHandlers is a custom type that maps strings to httprouter handle functions @@ -369,6 +372,7 @@ func createRouter(s *Server) *httprouter.Router { CheckpointRouteName: s.Checkpoint, EthereumRouteName: s.EthereumHandler, SubscribeRCInfoName: s.WebSocket, + SubscribeBlockDataName: s.BlockDataWebSocket, } // Initialize a new router using the httprouter package. diff --git a/cmd/rpc/sock.go b/cmd/rpc/sock.go index 9453d93e..c83e0ae3 100644 --- a/cmd/rpc/sock.go +++ b/cmd/rpc/sock.go @@ -10,7 +10,9 @@ import ( "time" "github.com/canopy-network/canopy/controller" + "github.com/canopy-network/canopy/fsm" "github.com/canopy-network/canopy/lib" + "github.com/canopy-network/canopy/store" "github.com/gorilla/websocket" "github.com/julienschmidt/httprouter" ) @@ -33,6 +35,7 @@ const ( // RCManager handles a group of root-chain sock clients type RCManager struct { c lib.Config // the global node config + controller *controller.Controller // reference to controller for state access subscriptions map[uint64]*RCSubscription // chainId -> subscription subscribers map[uint64][]*RCSubscriber // chainId -> subscribers l *sync.Mutex // thread safety @@ -47,6 +50,10 @@ type RCManager struct { maxRCSubscribers int maxRCSubscribersPerChain int subscriberCount int + // block data subscribers + blockDataSubscribers []*BlockDataSubscriber + blockDataSubscriberCount int + maxBlockDataSubscribers int } // NewRCManager() constructs a new instance of a RCManager @@ -78,6 +85,7 @@ func NewRCManager(controller *controller.Controller, config lib.Config, logger l // create the manager manager = &RCManager{ c: config, + controller: controller, subscriptions: make(map[uint64]*RCSubscription), subscribers: make(map[uint64][]*RCSubscriber), l: controller.Mutex, @@ -90,6 +98,8 @@ func NewRCManager(controller *controller.Controller, config lib.Config, logger l rcSubscriberPingPeriod: pingPeriod, maxRCSubscribers: maxSubscribers, maxRCSubscribersPerChain: maxSubscribersPerChain, + blockDataSubscribers: make([]*BlockDataSubscriber, 0), + maxBlockDataSubscribers: maxSubscribers, // reuse same limit } // set the manager in the controller controller.RCManager = manager @@ -128,6 +138,233 @@ func (r *RCManager) Publish(chainId uint64, info *lib.RootChainInfo) { } } +// buildIndexerSnapshot creates a lib.IndexerSnapshot protobuf for the given height +// This is used to send state snapshots over WebSocket alongside root chain info +func (r *RCManager) buildIndexerSnapshot(height uint64) (*lib.IndexerSnapshot, error) { + // Setup store for indexed data (blocks, txs, events) + db := r.controller.FSM.Store().(lib.StoreI).DB() + st, err := store.NewStoreWithDB(r.c, db, nil, r.log) + if err != nil { + return nil, err + } + defer st.Discard() + + if height == 0 { + height = st.Version() - 1 + } + prevHeight := height - 1 + + // Get state machines for current and previous height + smCurrent, err := r.controller.FSM.TimeMachine(height) + if err != nil { + return nil, err + } + defer smCurrent.Discard() + + smPrevious, err := r.controller.FSM.TimeMachine(prevHeight) + if err != nil { + return nil, err + } + defer smPrevious.Discard() + + snapshot := &lib.IndexerSnapshot{Height: height} + + // Fetch block data from indexer (errors result in nil, not failure) + var blockErr error + snapshot.Block, blockErr = st.GetBlockByHeight(height) + if blockErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetBlockByHeight failed for height %d: %s", height, blockErr.Error()) + } + if txPage, txErr := st.GetTxsByHeight(height, true, lib.PageParams{}); txErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetTxsByHeight failed for height %d: %s", height, txErr.Error()) + } else if txPage != nil { + if txs, ok := txPage.Results.(*lib.TxResults); ok { + snapshot.Transactions = []*lib.TxResult(*txs) + } + } + if evtPage, evtErr := st.GetEventsByBlockHeight(height, true, lib.PageParams{}); evtErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetEventsByBlockHeight failed for height %d: %s", height, evtErr.Error()) + } else if evtPage != nil { + if evts, ok := evtPage.Results.(*lib.Events); ok { + snapshot.Events = []*lib.Event(*evts) + } + } + + // Fetch state data (current height) - serialize to bytes for proto + if accPage, accErr := smCurrent.GetAccountsPaginated(lib.PageParams{}); accErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetAccountsPaginated failed for height %d: %s", height, accErr.Error()) + } else if accPage != nil { + if accs, ok := accPage.Results.(*fsm.AccountPage); ok { + snapshot.Accounts = make([][]byte, len(*accs)) + for i, acc := range *accs { + if data, marshalErr := lib.Marshal(acc); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal account[%d] failed for height %d: %s", i, height, marshalErr.Error()) + } else { + snapshot.Accounts[i] = data + } + } + } + } + if orders, ordersErr := smCurrent.GetOrderBooks(); ordersErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetOrderBooks failed for height %d: %s", height, ordersErr.Error()) + } else { + snapshot.Orders = orders + } + if prices, pricesErr := smCurrent.GetDexPrices(); pricesErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetDexPrices failed for height %d: %s", height, pricesErr.Error()) + } else if prices != nil { + snapshot.DexPrices = make([][]byte, len(prices)) + for i, p := range prices { + if data, marshalErr := lib.Marshal(p); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal dex price[%d] failed for height %d: %s", i, height, marshalErr.Error()) + } else { + snapshot.DexPrices[i] = data + } + } + } + if params, paramsErr := smCurrent.GetParams(); paramsErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetParams failed for height %d: %s", height, paramsErr.Error()) + } else if params != nil { + if data, marshalErr := lib.Marshal(params); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal params failed for height %d: %s", height, marshalErr.Error()) + } else { + snapshot.Params = data + } + } + if supply, supplyErr := smCurrent.GetSupply(); supplyErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetSupply failed for height %d: %s", height, supplyErr.Error()) + } else if supply != nil { + if data, marshalErr := lib.Marshal(supply); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal supply failed for height %d: %s", height, marshalErr.Error()) + } else { + snapshot.Supply = data + } + } + if committeesData, cdErr := smCurrent.GetCommitteesData(); cdErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetCommitteesData failed for height %d: %s", height, cdErr.Error()) + } else { + snapshot.CommitteesData = committeesData + } + if subsidized, subErr := smCurrent.GetSubsidizedCommittees(); subErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetSubsidizedCommittees failed for height %d: %s", height, subErr.Error()) + } else { + snapshot.SubsidizedCommittees = subsidized + } + if retired, retErr := smCurrent.GetRetiredCommittees(); retErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetRetiredCommittees failed for height %d: %s", height, retErr.Error()) + } else { + snapshot.RetiredCommittees = retired + } + + // Change detection pairs (current + H-1) - serialize validators/pools to bytes + if valPage, valErr := smCurrent.GetValidatorsPaginated(lib.PageParams{}, lib.ValidatorFilters{}); valErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetValidatorsPaginated (current) failed for height %d: %s", height, valErr.Error()) + } else if valPage != nil { + if vals, ok := valPage.Results.(*fsm.ValidatorPage); ok { + snapshot.ValidatorsCurrent = make([][]byte, len(*vals)) + for i, v := range *vals { + if data, marshalErr := lib.Marshal(v); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal validator current[%d] failed for height %d: %s", i, height, marshalErr.Error()) + } else { + snapshot.ValidatorsCurrent[i] = data + } + } + } + } + if valPage, valErr := smPrevious.GetValidatorsPaginated(lib.PageParams{}, lib.ValidatorFilters{}); valErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetValidatorsPaginated (previous) failed for height %d: %s", prevHeight, valErr.Error()) + } else if valPage != nil { + if vals, ok := valPage.Results.(*fsm.ValidatorPage); ok { + snapshot.ValidatorsPrevious = make([][]byte, len(*vals)) + for i, v := range *vals { + if data, marshalErr := lib.Marshal(v); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal validator previous[%d] failed for height %d: %s", i, prevHeight, marshalErr.Error()) + } else { + snapshot.ValidatorsPrevious[i] = data + } + } + } + } + + if poolPage, poolErr := smCurrent.GetPoolsPaginated(lib.PageParams{}); poolErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetPoolsPaginated (current) failed for height %d: %s", height, poolErr.Error()) + } else if poolPage != nil { + if pools, ok := poolPage.Results.(*fsm.PoolPage); ok { + snapshot.PoolsCurrent = make([][]byte, len(*pools)) + for i, p := range *pools { + if data, marshalErr := lib.Marshal(p); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal pool current[%d] failed for height %d: %s", i, height, marshalErr.Error()) + } else { + snapshot.PoolsCurrent[i] = data + } + } + } + } + if poolPage, poolErr := smPrevious.GetPoolsPaginated(lib.PageParams{}); poolErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetPoolsPaginated (previous) failed for height %d: %s", prevHeight, poolErr.Error()) + } else if poolPage != nil { + if pools, ok := poolPage.Results.(*fsm.PoolPage); ok { + snapshot.PoolsPrevious = make([][]byte, len(*pools)) + for i, p := range *pools { + if data, marshalErr := lib.Marshal(p); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal pool previous[%d] failed for height %d: %s", i, prevHeight, marshalErr.Error()) + } else { + snapshot.PoolsPrevious[i] = data + } + } + } + } + + if ns, nsErr := smCurrent.GetNonSigners(); nsErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetNonSigners (current) failed for height %d: %s", height, nsErr.Error()) + } else if ns != nil { + if data, marshalErr := lib.Marshal(ns); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal non-signers current failed for height %d: %s", height, marshalErr.Error()) + } else { + snapshot.NonSignersCurrent = data + } + } + if ns, nsErr := smPrevious.GetNonSigners(); nsErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetNonSigners (previous) failed for height %d: %s", prevHeight, nsErr.Error()) + } else if ns != nil { + if data, marshalErr := lib.Marshal(ns); marshalErr != nil { + r.log.Warnf("buildIndexerSnapshot: Marshal non-signers previous failed for height %d: %s", prevHeight, marshalErr.Error()) + } else { + snapshot.NonSignersPrevious = data + } + } + + if doubleSigners, dsErr := st.GetDoubleSigners(); dsErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetDoubleSigners failed for height %d: %s", height, dsErr.Error()) + } else { + snapshot.DoubleSignersCurrent = doubleSigners + } + + if dexBatches, dbErr := smCurrent.GetDexBatches(true); dbErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetDexBatches (current, confirmed) failed for height %d: %s", height, dbErr.Error()) + } else { + snapshot.DexBatchesCurrent = dexBatches + } + if dexBatches, dbErr := smPrevious.GetDexBatches(true); dbErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetDexBatches (previous, confirmed) failed for height %d: %s", prevHeight, dbErr.Error()) + } else { + snapshot.DexBatchesPrevious = dexBatches + } + + if nextDexBatches, ndbErr := smCurrent.GetDexBatches(false); ndbErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetDexBatches (current, next) failed for height %d: %s", height, ndbErr.Error()) + } else { + snapshot.NextDexBatchesCurrent = nextDexBatches + } + if nextDexBatches, ndbErr := smPrevious.GetDexBatches(false); ndbErr != nil { + r.log.Warnf("buildIndexerSnapshot: GetDexBatches (previous, next) failed for height %d: %s", prevHeight, ndbErr.Error()) + } else { + snapshot.NextDexBatchesPrevious = nextDexBatches + } + + return snapshot, nil +} + // ChainIds() returns a list of chainIds for subscribers func (r *RCManager) ChainIds() (list []uint64) { // de-duplicate the results @@ -618,3 +855,157 @@ func (r *RCSubscriber) Stop(err error) { // remove from the manager r.manager.RemoveSubscriber(r.chainId, r) } + +// BlockDataSubscriber represents a WebSocket client subscribed to block data updates +type BlockDataSubscriber struct { + conn *websocket.Conn // the underlying ws connection + manager *RCManager // reference to manager + log lib.LoggerI // stdout log + writeMu sync.Mutex // protects concurrent writes +} + +// BlockDataWebSocket() upgrades a http request to a websockets connection for block data +func (s *Server) BlockDataWebSocket(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + _ = w.(http.Hijacker) + // upgrade the connection to websockets + conn, err := s.rcManager.upgrader.Upgrade(w, r, nil) + if err != nil { + write(w, err, http.StatusInternalServerError) + s.logger.Error(err.Error()) + return + } + // create a new block data subscriber + subscriber := &BlockDataSubscriber{ + conn: conn, + manager: s.rcManager, + log: s.logger, + } + // add the subscriber to the manager + if err := s.rcManager.AddBlockDataSubscriber(subscriber); err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + if closeErr := conn.Close(); closeErr != nil { + s.logger.Error(closeErr.Error()) + } + return + } + // send initial snapshot at current height + go subscriber.sendInitialSnapshot() + // start the subscriber lifecycle + subscriber.Start() +} + +// sendInitialSnapshot sends the current block data snapshot when a client first connects +func (b *BlockDataSubscriber) sendInitialSnapshot() { + snapshot, err := b.manager.buildIndexerSnapshot(0) // 0 means latest height + if err != nil { + b.log.Warnf("BlockDataSubscriber: failed to build initial snapshot: %s", err.Error()) + return + } + protoBytes, err := lib.Marshal(snapshot) + if err != nil { + b.log.Warnf("BlockDataSubscriber: failed to marshal initial snapshot: %s", err.Error()) + return + } + if err := b.writeMessage(websocket.BinaryMessage, protoBytes); err != nil { + b.Stop(err) + } +} + +// AddBlockDataSubscriber adds a block data subscriber to the manager +func (r *RCManager) AddBlockDataSubscriber(subscriber *BlockDataSubscriber) error { + r.l.Lock() + defer r.l.Unlock() + if r.maxBlockDataSubscribers > 0 && r.blockDataSubscriberCount >= r.maxBlockDataSubscribers { + return fmt.Errorf("block data subscriber limit reached") + } + r.blockDataSubscribers = append(r.blockDataSubscribers, subscriber) + r.blockDataSubscriberCount++ + return nil +} + +// RemoveBlockDataSubscriber removes a block data subscriber from the manager +func (r *RCManager) RemoveBlockDataSubscriber(subscriber *BlockDataSubscriber) { + r.l.Lock() + defer r.l.Unlock() + before := len(r.blockDataSubscribers) + r.blockDataSubscribers = slices.DeleteFunc(r.blockDataSubscribers, func(sub *BlockDataSubscriber) bool { + return sub == subscriber + }) + if len(r.blockDataSubscribers) < before { + r.blockDataSubscriberCount-- + } +} + +// PublishBlockData sends the IndexerSnapshot to all block data subscribers +func (r *RCManager) PublishBlockData(height uint64) { + // build the snapshot + snapshot, err := r.buildIndexerSnapshot(height) + if err != nil { + r.log.Warnf("PublishBlockData: failed to build snapshot for height %d: %s", height, err.Error()) + return + } + // marshal to proto bytes + protoBytes, err := lib.Marshal(snapshot) + if err != nil { + r.log.Warnf("PublishBlockData: failed to marshal snapshot for height %d: %s", height, err.Error()) + return + } + // copy subscribers under lock to avoid map iteration races + r.l.Lock() + subscribers := append([]*BlockDataSubscriber(nil), r.blockDataSubscribers...) + r.l.Unlock() + // publish to each subscriber + for _, subscriber := range subscribers { + if e := subscriber.writeMessage(websocket.BinaryMessage, protoBytes); e != nil { + subscriber.Stop(e) + } + } +} + +// Start configures and starts block data subscriber lifecycle goroutines +func (b *BlockDataSubscriber) Start() { + b.conn.SetReadLimit(b.manager.rcSubscriberReadLimitBytes) + _ = b.conn.SetReadDeadline(time.Now().Add(b.manager.rcSubscriberPongWait)) + b.conn.SetPongHandler(func(string) error { + _ = b.conn.SetReadDeadline(time.Now().Add(b.manager.rcSubscriberPongWait)) + return nil + }) + go b.readLoop() + go b.pingLoop() +} + +func (b *BlockDataSubscriber) readLoop() { + for { + if _, _, err := b.conn.ReadMessage(); err != nil { + b.Stop(err) + return + } + } +} + +func (b *BlockDataSubscriber) pingLoop() { + ticker := time.NewTicker(b.manager.rcSubscriberPingPeriod) + defer ticker.Stop() + for range ticker.C { + if err := b.writeMessage(websocket.PingMessage, nil); err != nil { + b.Stop(err) + return + } + } +} + +func (b *BlockDataSubscriber) writeMessage(messageType int, data []byte) error { + b.writeMu.Lock() + defer b.writeMu.Unlock() + _ = b.conn.SetWriteDeadline(time.Now().Add(b.manager.rcSubscriberWriteTimeout)) + return b.conn.WriteMessage(messageType, data) +} + +// Stop stops the block data subscriber +func (b *BlockDataSubscriber) Stop(err error) { + b.log.Errorf("BlockData WS Failed with err: %s", err.Error()) + if err = b.conn.Close(); err != nil { + b.log.Error(err.Error()) + } + b.manager.RemoveBlockDataSubscriber(b) +} diff --git a/controller/block.go b/controller/block.go index 625918e4..dc47d8e9 100644 --- a/controller/block.go +++ b/controller/block.go @@ -318,6 +318,8 @@ func (c *Controller) CommitCertificate(qc *lib.QuorumCertificate, block *lib.Blo // publish root chain information go c.RCManager.Publish(id, info) } + // publish block data to all block data subscribers + go c.RCManager.PublishBlockData(block.BlockHeader.Height) // exit return } @@ -411,6 +413,8 @@ func (c *Controller) CommitCertificateParallel(qc *lib.QuorumCertificate, block info.Timestamp = ts go c.RCManager.Publish(id, info) } + // publish block data to all block data subscribers + go c.RCManager.PublishBlockData(block.BlockHeader.Height) // exit return nil }) diff --git a/fsm/dex_test.go b/fsm/dex_test.go index f4e84a31..1c4c631a 100644 --- a/fsm/dex_test.go +++ b/fsm/dex_test.go @@ -3471,6 +3471,8 @@ func (m *MockRCManager) SetDexBatch(rootChainId, height, committee uint64, batch // RCManagerI interface implementation func (m *MockRCManager) Publish(chainId uint64, info *lib.RootChainInfo) {} +func (m *MockRCManager) PublishBlockData(height uint64) {} + func (m *MockRCManager) ChainIds() []uint64 { return []uint64{1, 2} } func (m *MockRCManager) GetHeight(rootChainId uint64) uint64 { return 100 } diff --git a/lib/.proto/block.proto b/lib/.proto/block.proto index 6c3f3239..090adda5 100644 --- a/lib/.proto/block.proto +++ b/lib/.proto/block.proto @@ -7,6 +7,8 @@ import "tx.proto"; import "certificate.proto"; import "crypto.proto"; import "event.proto"; +import "swap.proto"; +import "dex.proto"; // ***************************************************************************************************** // This file is auto-generated from source files in `/lib/.proto/*` using Protocol Buffers (protobuf) @@ -100,3 +102,37 @@ message BlockResultMeta { // took: duration string of the block in milliseconds uint64 took = 2; } + +// IndexerSnapshot is a comprehensive state snapshot for indexers sent over WebSocket +// Uses raw slices instead of Page wrappers for proto compatibility +message IndexerSnapshot { + uint64 height = 1; + + // Block data + BlockResult block = 2; + repeated TxResult transactions = 3; + repeated Event events = 4; + + // State data (current height) + repeated bytes accounts = 5; // serialized Account messages + OrderBooks orders = 6; + repeated bytes dex_prices = 7; // serialized DexPrice structs @gotags: json:"dexPrices" + bytes params = 8; // serialized Params message + bytes supply = 9; // serialized Supply message + CommitteesData committees_data = 10; // @gotags: json:"committeesData" + repeated uint64 subsidized_committees = 11; // @gotags: json:"subsidizedCommittees" + repeated uint64 retired_committees = 12; // @gotags: json:"retiredCommittees" + + // Change detection pairs (current + previous) + repeated bytes validators_current = 13; // @gotags: json:"validatorsCurrent" + repeated bytes validators_previous = 14; // @gotags: json:"validatorsPrevious" + repeated bytes pools_current = 15; // @gotags: json:"poolsCurrent" + repeated bytes pools_previous = 16; // @gotags: json:"poolsPrevious" + bytes non_signers_current = 17; // @gotags: json:"nonSignersCurrent" + bytes non_signers_previous = 18; // @gotags: json:"nonSignersPrevious" + repeated DoubleSigner double_signers_current = 19; // @gotags: json:"doubleSignersCurrent" + repeated DexBatch dex_batches_current = 20; // @gotags: json:"dexBatchesCurrent" + repeated DexBatch dex_batches_previous = 21; // @gotags: json:"dexBatchesPrevious" + repeated DexBatch next_dex_batches_current = 22; // @gotags: json:"nextDexBatchesCurrent" + repeated DexBatch next_dex_batches_previous = 23; // @gotags: json:"nextDexBatchesPrevious" +} diff --git a/lib/block.pb.go b/lib/block.pb.go index 63daae6a..dab74dfc 100644 --- a/lib/block.pb.go +++ b/lib/block.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v5.29.3 +// protoc v6.33.1 // source: block.proto package lib @@ -408,11 +408,237 @@ func (x *BlockResultMeta) GetTook() uint64 { return 0 } +// IndexerSnapshot is a comprehensive state snapshot for indexers sent over WebSocket +// Uses raw slices instead of Page wrappers for proto compatibility +type IndexerSnapshot struct { + state protoimpl.MessageState `protogen:"open.v1"` + Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` + // Block data + Block *BlockResult `protobuf:"bytes,2,opt,name=block,proto3" json:"block,omitempty"` + Transactions []*TxResult `protobuf:"bytes,3,rep,name=transactions,proto3" json:"transactions,omitempty"` + Events []*Event `protobuf:"bytes,4,rep,name=events,proto3" json:"events,omitempty"` + // State data (current height) + Accounts [][]byte `protobuf:"bytes,5,rep,name=accounts,proto3" json:"accounts,omitempty"` // serialized Account messages + Orders *OrderBooks `protobuf:"bytes,6,opt,name=orders,proto3" json:"orders,omitempty"` + DexPrices [][]byte `protobuf:"bytes,7,rep,name=dex_prices,json=dexPrices,proto3" json:"dexPrices"` // serialized DexPrice structs @gotags: json:"dexPrices" + Params []byte `protobuf:"bytes,8,opt,name=params,proto3" json:"params,omitempty"` // serialized Params message + Supply []byte `protobuf:"bytes,9,opt,name=supply,proto3" json:"supply,omitempty"` // serialized Supply message + CommitteesData *CommitteesData `protobuf:"bytes,10,opt,name=committees_data,json=committeesData,proto3" json:"committeesData"` // @gotags: json:"committeesData" + SubsidizedCommittees []uint64 `protobuf:"varint,11,rep,packed,name=subsidized_committees,json=subsidizedCommittees,proto3" json:"subsidizedCommittees"` // @gotags: json:"subsidizedCommittees" + RetiredCommittees []uint64 `protobuf:"varint,12,rep,packed,name=retired_committees,json=retiredCommittees,proto3" json:"retiredCommittees"` // @gotags: json:"retiredCommittees" + // Change detection pairs (current + previous) + ValidatorsCurrent [][]byte `protobuf:"bytes,13,rep,name=validators_current,json=validatorsCurrent,proto3" json:"validatorsCurrent"` // @gotags: json:"validatorsCurrent" + ValidatorsPrevious [][]byte `protobuf:"bytes,14,rep,name=validators_previous,json=validatorsPrevious,proto3" json:"validatorsPrevious"` // @gotags: json:"validatorsPrevious" + PoolsCurrent [][]byte `protobuf:"bytes,15,rep,name=pools_current,json=poolsCurrent,proto3" json:"poolsCurrent"` // @gotags: json:"poolsCurrent" + PoolsPrevious [][]byte `protobuf:"bytes,16,rep,name=pools_previous,json=poolsPrevious,proto3" json:"poolsPrevious"` // @gotags: json:"poolsPrevious" + NonSignersCurrent []byte `protobuf:"bytes,17,opt,name=non_signers_current,json=nonSignersCurrent,proto3" json:"nonSignersCurrent"` // @gotags: json:"nonSignersCurrent" + NonSignersPrevious []byte `protobuf:"bytes,18,opt,name=non_signers_previous,json=nonSignersPrevious,proto3" json:"nonSignersPrevious"` // @gotags: json:"nonSignersPrevious" + DoubleSignersCurrent []*DoubleSigner `protobuf:"bytes,19,rep,name=double_signers_current,json=doubleSignersCurrent,proto3" json:"doubleSignersCurrent"` // @gotags: json:"doubleSignersCurrent" + DexBatchesCurrent []*DexBatch `protobuf:"bytes,20,rep,name=dex_batches_current,json=dexBatchesCurrent,proto3" json:"dexBatchesCurrent"` // @gotags: json:"dexBatchesCurrent" + DexBatchesPrevious []*DexBatch `protobuf:"bytes,21,rep,name=dex_batches_previous,json=dexBatchesPrevious,proto3" json:"dexBatchesPrevious"` // @gotags: json:"dexBatchesPrevious" + NextDexBatchesCurrent []*DexBatch `protobuf:"bytes,22,rep,name=next_dex_batches_current,json=nextDexBatchesCurrent,proto3" json:"nextDexBatchesCurrent"` // @gotags: json:"nextDexBatchesCurrent" + NextDexBatchesPrevious []*DexBatch `protobuf:"bytes,23,rep,name=next_dex_batches_previous,json=nextDexBatchesPrevious,proto3" json:"nextDexBatchesPrevious"` // @gotags: json:"nextDexBatchesPrevious" + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *IndexerSnapshot) Reset() { + *x = IndexerSnapshot{} + mi := &file_block_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *IndexerSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IndexerSnapshot) ProtoMessage() {} + +func (x *IndexerSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_block_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IndexerSnapshot.ProtoReflect.Descriptor instead. +func (*IndexerSnapshot) Descriptor() ([]byte, []int) { + return file_block_proto_rawDescGZIP(), []int{4} +} + +func (x *IndexerSnapshot) GetHeight() uint64 { + if x != nil { + return x.Height + } + return 0 +} + +func (x *IndexerSnapshot) GetBlock() *BlockResult { + if x != nil { + return x.Block + } + return nil +} + +func (x *IndexerSnapshot) GetTransactions() []*TxResult { + if x != nil { + return x.Transactions + } + return nil +} + +func (x *IndexerSnapshot) GetEvents() []*Event { + if x != nil { + return x.Events + } + return nil +} + +func (x *IndexerSnapshot) GetAccounts() [][]byte { + if x != nil { + return x.Accounts + } + return nil +} + +func (x *IndexerSnapshot) GetOrders() *OrderBooks { + if x != nil { + return x.Orders + } + return nil +} + +func (x *IndexerSnapshot) GetDexPrices() [][]byte { + if x != nil { + return x.DexPrices + } + return nil +} + +func (x *IndexerSnapshot) GetParams() []byte { + if x != nil { + return x.Params + } + return nil +} + +func (x *IndexerSnapshot) GetSupply() []byte { + if x != nil { + return x.Supply + } + return nil +} + +func (x *IndexerSnapshot) GetCommitteesData() *CommitteesData { + if x != nil { + return x.CommitteesData + } + return nil +} + +func (x *IndexerSnapshot) GetSubsidizedCommittees() []uint64 { + if x != nil { + return x.SubsidizedCommittees + } + return nil +} + +func (x *IndexerSnapshot) GetRetiredCommittees() []uint64 { + if x != nil { + return x.RetiredCommittees + } + return nil +} + +func (x *IndexerSnapshot) GetValidatorsCurrent() [][]byte { + if x != nil { + return x.ValidatorsCurrent + } + return nil +} + +func (x *IndexerSnapshot) GetValidatorsPrevious() [][]byte { + if x != nil { + return x.ValidatorsPrevious + } + return nil +} + +func (x *IndexerSnapshot) GetPoolsCurrent() [][]byte { + if x != nil { + return x.PoolsCurrent + } + return nil +} + +func (x *IndexerSnapshot) GetPoolsPrevious() [][]byte { + if x != nil { + return x.PoolsPrevious + } + return nil +} + +func (x *IndexerSnapshot) GetNonSignersCurrent() []byte { + if x != nil { + return x.NonSignersCurrent + } + return nil +} + +func (x *IndexerSnapshot) GetNonSignersPrevious() []byte { + if x != nil { + return x.NonSignersPrevious + } + return nil +} + +func (x *IndexerSnapshot) GetDoubleSignersCurrent() []*DoubleSigner { + if x != nil { + return x.DoubleSignersCurrent + } + return nil +} + +func (x *IndexerSnapshot) GetDexBatchesCurrent() []*DexBatch { + if x != nil { + return x.DexBatchesCurrent + } + return nil +} + +func (x *IndexerSnapshot) GetDexBatchesPrevious() []*DexBatch { + if x != nil { + return x.DexBatchesPrevious + } + return nil +} + +func (x *IndexerSnapshot) GetNextDexBatchesCurrent() []*DexBatch { + if x != nil { + return x.NextDexBatchesCurrent + } + return nil +} + +func (x *IndexerSnapshot) GetNextDexBatchesPrevious() []*DexBatch { + if x != nil { + return x.NextDexBatchesPrevious + } + return nil +} + var File_block_proto protoreflect.FileDescriptor const file_block_proto_rawDesc = "" + "\n" + - "\vblock.proto\x12\x05types\x1a\btx.proto\x1a\x11certificate.proto\x1a\fcrypto.proto\x1a\vevent.proto\"\xb8\x04\n" + + "\vblock.proto\x12\x05types\x1a\btx.proto\x1a\x11certificate.proto\x1a\fcrypto.proto\x1a\vevent.proto\x1a\n" + + "swap.proto\x1a\tdex.proto\"\xb8\x04\n" + "\vBlockHeader\x12\x16\n" + "\x06height\x18\x01 \x01(\x04R\x06height\x12\x12\n" + "\x04hash\x18\x02 \x01(\fR\x04hash\x12\x1d\n" + @@ -443,7 +669,33 @@ const file_block_proto_rawDesc = "" + "\x04meta\x18\x04 \x01(\v2\x16.types.BlockResultMetaR\x04meta\"9\n" + "\x0fBlockResultMeta\x12\x12\n" + "\x04size\x18\x01 \x01(\x04R\x04size\x12\x12\n" + - "\x04took\x18\x02 \x01(\x04R\x04tookB&Z$github.com/canopy-network/canopy/libb\x06proto3" + "\x04took\x18\x02 \x01(\x04R\x04took\"\xdb\b\n" + + "\x0fIndexerSnapshot\x12\x16\n" + + "\x06height\x18\x01 \x01(\x04R\x06height\x12(\n" + + "\x05block\x18\x02 \x01(\v2\x12.types.BlockResultR\x05block\x123\n" + + "\ftransactions\x18\x03 \x03(\v2\x0f.types.TxResultR\ftransactions\x12$\n" + + "\x06events\x18\x04 \x03(\v2\f.types.EventR\x06events\x12\x1a\n" + + "\baccounts\x18\x05 \x03(\fR\baccounts\x12)\n" + + "\x06orders\x18\x06 \x01(\v2\x11.types.OrderBooksR\x06orders\x12\x1d\n" + + "\n" + + "dex_prices\x18\a \x03(\fR\tdexPrices\x12\x16\n" + + "\x06params\x18\b \x01(\fR\x06params\x12\x16\n" + + "\x06supply\x18\t \x01(\fR\x06supply\x12>\n" + + "\x0fcommittees_data\x18\n" + + " \x01(\v2\x15.types.CommitteesDataR\x0ecommitteesData\x123\n" + + "\x15subsidized_committees\x18\v \x03(\x04R\x14subsidizedCommittees\x12-\n" + + "\x12retired_committees\x18\f \x03(\x04R\x11retiredCommittees\x12-\n" + + "\x12validators_current\x18\r \x03(\fR\x11validatorsCurrent\x12/\n" + + "\x13validators_previous\x18\x0e \x03(\fR\x12validatorsPrevious\x12#\n" + + "\rpools_current\x18\x0f \x03(\fR\fpoolsCurrent\x12%\n" + + "\x0epools_previous\x18\x10 \x03(\fR\rpoolsPrevious\x12.\n" + + "\x13non_signers_current\x18\x11 \x01(\fR\x11nonSignersCurrent\x120\n" + + "\x14non_signers_previous\x18\x12 \x01(\fR\x12nonSignersPrevious\x12I\n" + + "\x16double_signers_current\x18\x13 \x03(\v2\x13.types.DoubleSignerR\x14doubleSignersCurrent\x12?\n" + + "\x13dex_batches_current\x18\x14 \x03(\v2\x0f.types.DexBatchR\x11dexBatchesCurrent\x12A\n" + + "\x14dex_batches_previous\x18\x15 \x03(\v2\x0f.types.DexBatchR\x12dexBatchesPrevious\x12H\n" + + "\x18next_dex_batches_current\x18\x16 \x03(\v2\x0f.types.DexBatchR\x15nextDexBatchesCurrent\x12J\n" + + "\x19next_dex_batches_previous\x18\x17 \x03(\v2\x0f.types.DexBatchR\x16nextDexBatchesPreviousB&Z$github.com/canopy-network/canopy/libb\x06proto3" var ( file_block_proto_rawDescOnce sync.Once @@ -457,30 +709,45 @@ func file_block_proto_rawDescGZIP() []byte { return file_block_proto_rawDescData } -var file_block_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_block_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_block_proto_goTypes = []any{ (*BlockHeader)(nil), // 0: types.BlockHeader (*Block)(nil), // 1: types.Block (*BlockResult)(nil), // 2: types.BlockResult (*BlockResultMeta)(nil), // 3: types.BlockResultMeta - (*crypto.VDF)(nil), // 4: types.VDF - (*QuorumCertificate)(nil), // 5: types.QuorumCertificate - (*TxResult)(nil), // 6: types.TxResult - (*Event)(nil), // 7: types.Event + (*IndexerSnapshot)(nil), // 4: types.IndexerSnapshot + (*crypto.VDF)(nil), // 5: types.VDF + (*QuorumCertificate)(nil), // 6: types.QuorumCertificate + (*TxResult)(nil), // 7: types.TxResult + (*Event)(nil), // 8: types.Event + (*OrderBooks)(nil), // 9: types.OrderBooks + (*CommitteesData)(nil), // 10: types.CommitteesData + (*DoubleSigner)(nil), // 11: types.DoubleSigner + (*DexBatch)(nil), // 12: types.DexBatch } var file_block_proto_depIdxs = []int32{ - 4, // 0: types.BlockHeader.vdf:type_name -> types.VDF - 5, // 1: types.BlockHeader.last_quorum_certificate:type_name -> types.QuorumCertificate - 0, // 2: types.Block.block_header:type_name -> types.BlockHeader - 0, // 3: types.BlockResult.block_header:type_name -> types.BlockHeader - 6, // 4: types.BlockResult.transactions:type_name -> types.TxResult - 7, // 5: types.BlockResult.events:type_name -> types.Event - 3, // 6: types.BlockResult.meta:type_name -> types.BlockResultMeta - 7, // [7:7] is the sub-list for method output_type - 7, // [7:7] is the sub-list for method input_type - 7, // [7:7] is the sub-list for extension type_name - 7, // [7:7] is the sub-list for extension extendee - 0, // [0:7] is the sub-list for field type_name + 5, // 0: types.BlockHeader.vdf:type_name -> types.VDF + 6, // 1: types.BlockHeader.last_quorum_certificate:type_name -> types.QuorumCertificate + 0, // 2: types.Block.block_header:type_name -> types.BlockHeader + 0, // 3: types.BlockResult.block_header:type_name -> types.BlockHeader + 7, // 4: types.BlockResult.transactions:type_name -> types.TxResult + 8, // 5: types.BlockResult.events:type_name -> types.Event + 3, // 6: types.BlockResult.meta:type_name -> types.BlockResultMeta + 2, // 7: types.IndexerSnapshot.block:type_name -> types.BlockResult + 7, // 8: types.IndexerSnapshot.transactions:type_name -> types.TxResult + 8, // 9: types.IndexerSnapshot.events:type_name -> types.Event + 9, // 10: types.IndexerSnapshot.orders:type_name -> types.OrderBooks + 10, // 11: types.IndexerSnapshot.committees_data:type_name -> types.CommitteesData + 11, // 12: types.IndexerSnapshot.double_signers_current:type_name -> types.DoubleSigner + 12, // 13: types.IndexerSnapshot.dex_batches_current:type_name -> types.DexBatch + 12, // 14: types.IndexerSnapshot.dex_batches_previous:type_name -> types.DexBatch + 12, // 15: types.IndexerSnapshot.next_dex_batches_current:type_name -> types.DexBatch + 12, // 16: types.IndexerSnapshot.next_dex_batches_previous:type_name -> types.DexBatch + 17, // [17:17] is the sub-list for method output_type + 17, // [17:17] is the sub-list for method input_type + 17, // [17:17] is the sub-list for extension type_name + 17, // [17:17] is the sub-list for extension extendee + 0, // [0:17] is the sub-list for field type_name } func init() { file_block_proto_init() } @@ -491,13 +758,15 @@ func file_block_proto_init() { file_tx_proto_init() file_certificate_proto_init() file_event_proto_init() + file_swap_proto_init() + file_dex_proto_init() type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_block_proto_rawDesc), len(file_block_proto_rawDesc)), NumEnums: 0, - NumMessages: 4, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, diff --git a/lib/consensus.go b/lib/consensus.go index 02d4754e..6021e1a3 100644 --- a/lib/consensus.go +++ b/lib/consensus.go @@ -101,6 +101,7 @@ func (vs *ValidatorSet) GetValidatorAndIdx(targetPublicKey []byte) (val *Consens // RootChainClient executes 'on-demand' calls to the root-chain type RCManagerI interface { Publish(chainId uint64, info *RootChainInfo) // publish the root chain info to nested chain listeners + PublishBlockData(height uint64) // publish block data to all block data subscribers ChainIds() []uint64 // get the list of chain ids of the nested chain subscribers GetHeight(rootChainId uint64) uint64 // get the height of the root chain GetRootChainInfo(rootChainId, chainId uint64) (rootChainInfo *RootChainInfo, err ErrorI) // get root-chain info 'on-demand'