Skip to content

Commit dbe6202

Browse files
galt-troskarszoonicellan
authored
Expand peer registry usage and improve reputation system for catchup (#20)
Co-authored-by: oskarszoon <[email protected]> Co-authored-by: Siggi <[email protected]> Co-authored-by: Siggi <[email protected]>
1 parent 956fd90 commit dbe6202

File tree

151 files changed

+12623
-6011
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

151 files changed

+12623
-6011
lines changed

CLAUDE.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,53 @@ The project includes a Bitcoin expert agent (`.claude/agents/bitcoin-expert.md`)
193193
- Don't use mock blockchain client/store - you can use a real one using the sqlitememory store
194194
- Don't use mock kafka - you can use in_memory_kafka.go
195195

196+
### Service Interface Design Pattern
197+
198+
When creating or updating service interfaces and clients, follow this pattern to avoid exposing protobuf/gRPC types:
199+
200+
**Interface Layer** (`Interface.go`):
201+
- Define interfaces using native Go types and existing domain types (e.g., `*PeerInfo`, `[]string`, `bool`, `error`)
202+
- Do NOT expose protobuf types (e.g., `*p2p_api.GetPeersResponse`) in interface signatures
203+
- Use simple, idiomatic Go return types: `error` for success/fail, `bool` for yes/no, `[]string` for lists
204+
- Prefer existing domain structs over creating new minimal types
205+
206+
**Client Layer** (`Client.go`):
207+
- Keep the protobuf/gRPC import for internal use (e.g., `import "github.com/bsv-blockchain/teranode/services/p2p/p2p_api"`)
208+
- Maintain internal gRPC client field (e.g., `client p2p_api.PeerServiceClient`)
209+
- Public methods match the interface signatures (native types)
210+
- Convert between native types and protobuf types internally using helper functions
211+
212+
**Benefits**:
213+
- Cleaner API boundaries between services
214+
- Reduces coupling to gRPC implementation details
215+
- Makes interfaces more testable (no protobuf dependencies needed for mocks)
216+
- Uses idiomatic Go types that are easier to work with
217+
218+
**Example**:
219+
```go
220+
// Interface.go - Clean, no protobuf types
221+
type ClientI interface {
222+
GetPeers(ctx context.Context) ([]*PeerInfo, error)
223+
BanPeer(ctx context.Context, peerID string, duration int64, reason string) error
224+
IsBanned(ctx context.Context, peerID string) (bool, error)
225+
ListBanned(ctx context.Context) ([]string, error)
226+
}
227+
228+
// Client.go - Internal conversion
229+
type Client struct {
230+
client p2p_api.PeerServiceClient // gRPC client
231+
}
232+
233+
func (c *Client) GetPeers(ctx context.Context) ([]*PeerInfo, error) {
234+
resp, err := c.client.GetPeers(ctx, &emptypb.Empty{})
235+
if err != nil {
236+
return nil, err
237+
}
238+
// Convert p2p_api types to native PeerInfo
239+
return convertFromAPIResponse(resp), nil
240+
}
241+
```
242+
196243
## Git Workflow (Fork Mode)
197244

198245
All developers work in forked repositories with `upstream` remote pointing to the original repo.

cmd/filereader/file_reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func handleSubtreeData(br *bufio.Reader, logger ulogger.Logger, settings *settin
212212
return errors.NewProcessingError("error reading subtree", err)
213213
}
214214

215-
var sd *subtree.SubtreeData
215+
var sd *subtree.Data
216216

217217
sd, err = subtree.NewSubtreeDataFromReader(st, br)
218218
if err != nil {
@@ -258,7 +258,7 @@ func handleSubtreeMeta(br *bufio.Reader, logger ulogger.Logger, settings *settin
258258
return errors.NewProcessingError("error reading subtree", err)
259259
}
260260

261-
var subtreeMeta *subtree.SubtreeMeta
261+
var subtreeMeta *subtree.Meta
262262

263263
subtreeMeta, err = subtree.NewSubtreeMetaFromReader(st, br)
264264
if err != nil {

daemon/daemon.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ const (
7676
loggerTransactions = "txs"
7777
loggerTxValidator = "txval"
7878
loggerUtxos = "utxos"
79+
loggerAlert = "alert"
7980

8081
// Service names
8182
serviceAlert = "alert"

daemon/daemon_services.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,16 @@ func (d *Daemon) startAssetService(ctx context.Context, appSettings *settings.Se
401401
return err
402402
}
403403

404+
// Get the P2P client for the Asset service
405+
var p2pClient p2p.ClientI
406+
407+
p2pClient, err = d.daemonStores.GetP2PClient(
408+
ctx, createLogger(loggerP2P), appSettings,
409+
)
410+
if err != nil {
411+
return err
412+
}
413+
404414
// Initialize the Asset service with the necessary parts
405415
return d.ServiceManager.AddService(serviceAssetFormal, asset.NewServer(
406416
createLogger(serviceAsset),
@@ -411,6 +421,7 @@ func (d *Daemon) startAssetService(ctx context.Context, appSettings *settings.Se
411421
blockPersisterStore,
412422
blockchainClient,
413423
blockvalidationClient,
424+
p2pClient,
414425
))
415426
}
416427

@@ -433,25 +444,25 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett
433444
return err
434445
}
435446

436-
blockAssemblyClient, err := blockassembly.NewClient(ctx, createLogger("ba"), appSettings)
447+
blockAssemblyClient, err := GetBlockAssemblyClient(ctx, createLogger("rpc"), appSettings)
437448
if err != nil {
438449
return err
439450
}
440451

441-
peerClient, err := peer.NewClient(ctx, createLogger("peer"), appSettings)
452+
peerClient, err := peer.NewClient(ctx, createLogger("rpc"), appSettings)
442453
if err != nil {
443454
return err
444455
}
445456

446-
p2pClient, err := p2p.NewClient(ctx, createLogger("p2p"), appSettings)
457+
p2pClient, err := d.daemonStores.GetP2PClient(ctx, createLogger("rpc"), appSettings)
447458
if err != nil {
448459
return err
449460
}
450461

451462
// Create block validation client for RPC service
452463
var blockValidationClient blockvalidation.Interface
453464

454-
blockValidationClient, err = d.daemonStores.GetBlockValidationClient(ctx, createLogger("blockvalidation"), appSettings)
465+
blockValidationClient, err = d.daemonStores.GetBlockValidationClient(ctx, createLogger("rpc"), appSettings)
455466
if err != nil {
456467
return err
457468
}
@@ -481,7 +492,7 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett
481492
}
482493

483494
// Add the RPC service to the ServiceManager
484-
if err := d.ServiceManager.AddService(serviceRPCFormal, rpcServer); err != nil {
495+
if err = d.ServiceManager.AddService(serviceRPCFormal, rpcServer); err != nil {
485496
return err
486497
}
487498

@@ -490,43 +501,43 @@ func (d *Daemon) startRPCService(ctx context.Context, appSettings *settings.Sett
490501

491502
// startAlertService initializes and adds the Alert service to the ServiceManager.
492503
func (d *Daemon) startAlertService(ctx context.Context, appSettings *settings.Settings,
493-
createLogger func(string) ulogger.Logger) error {
504+
createLogger func(string) ulogger.Logger) (err error) {
505+
var (
506+
blockchainClient blockchain.ClientI
507+
utxoStore utxo.Store
508+
blockAssemblyClient blockassembly.ClientI
509+
peerClient peer.ClientI
510+
p2pClient p2p.ClientI
511+
)
512+
494513
// Create the blockchain client for the Alert service
495-
blockchainClient, err := d.daemonStores.GetBlockchainClient(
514+
blockchainClient, err = d.daemonStores.GetBlockchainClient(
496515
ctx, createLogger(loggerBlockchainClient), appSettings, serviceAlert,
497516
)
498517
if err != nil {
499518
return err
500519
}
501520

502521
// Create the UTXO store for the Alert service
503-
var utxoStore utxo.Store
504-
505-
utxoStore, err = d.daemonStores.GetUtxoStore(ctx, createLogger(loggerUtxos), appSettings)
522+
utxoStore, err = d.daemonStores.GetUtxoStore(ctx, createLogger(loggerAlert), appSettings)
506523
if err != nil {
507524
return err
508525
}
509526

510527
// Create the block assembly client for the Alert service
511-
var blockAssemblyClient *blockassembly.Client
512-
513-
blockAssemblyClient, err = blockassembly.NewClient(ctx, createLogger(loggerBlockAssembly), appSettings)
528+
blockAssemblyClient, err = GetBlockAssemblyClient(ctx, createLogger(loggerAlert), appSettings)
514529
if err != nil {
515530
return err
516531
}
517532

518533
// Create the peer client for the Alert service
519-
var peerClient peer.ClientI
520-
521-
peerClient, err = peer.NewClient(ctx, createLogger(loggerPeerClient), appSettings)
534+
peerClient, err = peer.NewClient(ctx, createLogger(loggerAlert), appSettings)
522535
if err != nil {
523536
return err
524537
}
525538

526539
// Create the P2P client for the Alert service
527-
var p2pClient p2p.ClientI
528-
529-
p2pClient, err = p2p.NewClient(ctx, createLogger(loggerP2P), appSettings)
540+
p2pClient, err = d.daemonStores.GetP2PClient(ctx, createLogger(loggerAlert), appSettings)
530541
if err != nil {
531542
return err
532543
}
@@ -762,6 +773,14 @@ func (d *Daemon) startValidationService(
762773
return err
763774
}
764775

776+
// Create the P2P client for the SubtreeValidation service
777+
var p2pClient p2p.ClientI
778+
779+
p2pClient, err = d.daemonStores.GetP2PClient(ctx, createLogger(loggerP2P), appSettings)
780+
if err != nil {
781+
return err
782+
}
783+
765784
// Create the SubtreeValidation service
766785
var service *subtreevalidation.Server
767786

@@ -776,6 +795,7 @@ func (d *Daemon) startValidationService(
776795
blockchainClient,
777796
subtreeConsumerClient,
778797
txMetaConsumerClient,
798+
p2pClient,
779799
)
780800
if err != nil {
781801
return err
@@ -810,6 +830,14 @@ func (d *Daemon) startValidationService(
810830
return err
811831
}
812832

833+
// Create the P2P client for the BlockValidation service
834+
var p2pClient p2p.ClientI
835+
836+
p2pClient, err = d.daemonStores.GetP2PClient(ctx, createLogger(loggerP2P), appSettings)
837+
if err != nil {
838+
return err
839+
}
840+
813841
// Create the BlockValidation service
814842
d.blockValidationSrv = blockvalidation.New(
815843
createLogger(loggerBlockValidation),
@@ -821,6 +849,7 @@ func (d *Daemon) startValidationService(
821849
blockchainClient,
822850
kafkaConsumerClient,
823851
blockAssemblyClient,
852+
p2pClient,
824853
)
825854

826855
// Add the BlockValidation service to the ServiceManager

daemon/daemon_stores.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/bsv-blockchain/teranode/services/blockassembly"
99
"github.com/bsv-blockchain/teranode/services/blockchain"
1010
"github.com/bsv-blockchain/teranode/services/blockvalidation"
11+
"github.com/bsv-blockchain/teranode/services/p2p"
1112
"github.com/bsv-blockchain/teranode/services/subtreevalidation"
1213
"github.com/bsv-blockchain/teranode/services/validator"
1314
"github.com/bsv-blockchain/teranode/settings"
@@ -24,6 +25,7 @@ type Stores struct {
2425
mainBlockPersisterStore blob.Store
2526
mainBlockStore blob.Store
2627
mainBlockValidationClient blockvalidation.Interface
28+
mainP2PClient p2p.ClientI
2729
mainSubtreeStore blob.Store
2830
mainSubtreeValidationClient subtreevalidation.Interface
2931
mainTempStore blob.Store
@@ -83,6 +85,33 @@ func (d *Stores) GetBlockValidationClient(ctx context.Context, logger ulogger.Lo
8385
return d.mainBlockValidationClient, err
8486
}
8587

88+
// GetP2PClient creates and returns a new P2P client instance. Unlike other store getters, this function
89+
// always creates a new client instance to maintain source information. The source parameter
90+
// identifies the origin or purpose of the client.
91+
//
92+
// Parameters:
93+
// - ctx: The context for managing the client's lifecycle.
94+
// - logger: The logger instance for logging client activities.
95+
// - appSettings: The application settings containing configuration details.
96+
//
97+
// Returns:
98+
// - p2p.ClientI: The newly created P2P client instance.
99+
// - error: An error object if the client creation fails; otherwise, nil.
100+
func (d *Stores) GetP2PClient(ctx context.Context, logger ulogger.Logger, appSettings *settings.Settings) (p2p.ClientI, error) {
101+
if d.mainP2PClient != nil {
102+
return d.mainP2PClient, nil
103+
}
104+
105+
p2pClient, err := p2p.NewClient(ctx, logger, appSettings)
106+
if err != nil {
107+
return nil, err
108+
}
109+
110+
d.mainP2PClient = p2pClient
111+
112+
return p2pClient, nil
113+
}
114+
86115
// GetBlockchainClient creates and returns a new blockchain client instance. Unlike other store
87116
// getters, this function always creates a new client instance to maintain source information.
88117
// The source parameter identifies the origin or purpose of the client.
@@ -376,6 +405,7 @@ func (d *Stores) Cleanup() {
376405
d.mainTxStore = nil
377406
d.mainUtxoStore = nil
378407
d.mainValidatorClient = nil
408+
d.mainP2PClient = nil
379409

380410
// Reset the Aerospike cleanup service singleton if it exists
381411
// This prevents state leakage between test runs

daemon/test_daemon.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,7 +1399,7 @@ func createAndSaveSubtrees(ctx context.Context, subtreeStore blob.Store, txs []*
13991399
}
14001400

14011401
// storeSubtreeFiles serializes and stores the subtree, subtree data, and subtree meta in the provided subtree store.
1402-
func storeSubtreeFiles(ctx context.Context, subtreeStore blob.Store, subtree *subtreepkg.Subtree, subtreeData *subtreepkg.SubtreeData, subtreeMeta *subtreepkg.SubtreeMeta) error {
1402+
func storeSubtreeFiles(ctx context.Context, subtreeStore blob.Store, subtree *subtreepkg.Subtree, subtreeData *subtreepkg.Data, subtreeMeta *subtreepkg.Meta) error {
14031403
subtreeBytes, err := subtree.Serialize()
14041404
if err != nil {
14051405
return err
@@ -1874,22 +1874,22 @@ func (td *TestDaemon) ConnectToPeer(t *testing.T, peer *TestDaemon) {
18741874

18751875
return
18761876
case <-ticker.C:
1877-
r, err := td.P2PClient.GetPeers(td.Ctx)
1877+
peers, err := td.P2PClient.GetPeers(td.Ctx)
18781878
if err != nil {
18791879
// If there's an error calling RPC, log it and continue retrying
18801880
t.Logf("Error calling getpeerinfo: %v. Retrying...", err)
18811881
continue
18821882
}
18831883

1884-
if len(r.Peers) == 0 {
1884+
if len(peers) == 0 {
18851885
t.Logf("getpeerinfo returned empty peer list. Retrying...")
18861886
continue
18871887
}
18881888

18891889
found := false
18901890

1891-
for _, p := range r.Peers {
1892-
if p != nil && p.Id == peer.Settings.P2P.PeerID {
1891+
for _, p := range peers {
1892+
if p != nil && p.ID.String() == peer.Settings.P2P.PeerID {
18931893
found = true
18941894
break
18951895
}

docs/references/services/blockvalidation_reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ Validates a block and returns validation results without adding it to the blockc
284284
#### processBlockFound
285285

286286
```go
287-
func (u *Server) processBlockFound(ctx context.Context, hash *chainhash.Hash, baseURL string, peerID string, useBlock ...*model.Block) error
287+
func (u *Server) processBlockFound(ctx context.Context, hash *chainhash.Hash, peerID string, baseURL string, useBlock ...*model.Block) error
288288
```
289289

290290
Internal method that processes a newly discovered block. Handles block retrieval, validation, and integration with the blockchain state.

docs/references/settings/services/p2p_settings.md

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,10 @@
2828
| BanDuration | time.Duration | 24h | p2p_ban_duration | Ban duration |
2929
| ForceSyncPeer | string | "" | p2p_force_sync_peer | **CRITICAL** - Forced sync peer override |
3030
| SharePrivateAddresses | bool | true | p2p_share_private_addresses | Private address advertisement |
31-
| PeerHealthCheckInterval | time.Duration | 30s | p2p_health_check_interval | **CRITICAL** - Health check timing |
32-
| PeerHealthHTTPTimeout | time.Duration | 5s | p2p_health_http_timeout | **CRITICAL** - Health check HTTP timeout |
33-
| PeerHealthRemoveAfterFailures | int | 3 | p2p_health_remove_after_failures | **CRITICAL** - Failure threshold for peer removal |
3431
| AllowPrunedNodeFallback | bool | true | p2p_allow_pruned_node_fallback | **CRITICAL** - Pruned node fallback behavior |
3532

3633
## Configuration Dependencies
3734

38-
### Peer Health Management
39-
- `PeerHealthCheckInterval`, `PeerHealthHTTPTimeout`, and `PeerHealthRemoveAfterFailures` work together
40-
- Controls peer health monitoring and removal behavior
41-
4235
### Forced Sync Peer Selection
4336
- `ForceSyncPeer` overrides automatic peer selection
4437
- `AllowPrunedNodeFallback` affects fallback behavior when forced peer unavailable

errors/Error_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var (
1717
ErrBlockInvalidFormat = New(ERR_BLOCK_INVALID_FORMAT, "block format is invalid")
1818
ErrBlockNotFound = New(ERR_BLOCK_NOT_FOUND, "block not found")
1919
ErrBlockParentNotMined = New(ERR_BLOCK_PARENT_NOT_MINED, "block parent not mined")
20+
ErrCatchupInProgress = New(ERR_CATCHUP_IN_PROGRESS, "catchup in progress")
2021
ErrConfiguration = New(ERR_CONFIGURATION, "configuration error")
2122
ErrContextCanceled = New(ERR_CONTEXT_CANCELED, "context canceled")
2223
ErrError = New(ERR_ERROR, "generic error")
@@ -286,6 +287,11 @@ func NewStateInitializationError(message string, params ...interface{}) *Error {
286287
return New(ERR_STATE_INITIALIZATION, message, params...)
287288
}
288289

290+
// NewCatchupInProgressError creates a new error with the catchup in progress error code.
291+
func NewCatchupInProgressError(message string, params ...interface{}) *Error {
292+
return New(ERR_CATCHUP_IN_PROGRESS, message, params...)
293+
}
294+
289295
// NewStateError creates a new error with the state error code.
290296
func NewStateError(message string, params ...interface{}) *Error {
291297
return New(ERR_STATE_ERROR, message, params...)

0 commit comments

Comments
 (0)