Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion services/p2p/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -454,7 +455,9 @@ func (s *Server) Health(ctx context.Context, checkLiveness bool) (int, string, e
// It performs initial setup of HTTP endpoints and sets configuration variables used during the main Start phase.
//
// The initialization process configures the service's public-facing HTTP address for asset discovery
// and prepares internal data structures and channels.
// and prepares internal data structures and channels. It also performs a health check on the configured
// asset address to ensure it is reachable. If the health check fails and the node is in full mode,
// it automatically switches to listen-only mode to prevent other peers from attempting to fetch data.
//
// Returns an error if any component initialization fails, or nil if successful.
func (s *Server) Init(ctx context.Context) (err error) {
Expand All @@ -467,9 +470,91 @@ func (s *Server) Init(ctx context.Context) (err error) {

s.AssetHTTPAddressURL = AssetHTTPAddressURLString

// Perform health check on the asset address if in full mode
// If the address is not reachable, automatically switch to listen-only mode
if s.settings.P2P.ListenMode == settings.ListenModeFull {
if !s.isAssetAddressReachable(ctx, s.AssetHTTPAddressURL) {
s.logger.Warnf("[Init] Asset HTTP address %s is not reachable. Automatically switching to listen-only mode to prevent peers from attempting to fetch data from this node. Please check your asset_httpPublicAddress configuration and ensure the endpoint is publicly accessible.", s.AssetHTTPAddressURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: Modifying s.settings.P2P.ListenMode after server initialization can cause inconsistencies. Settings should generally be immutable after configuration.

Suggestion: Consider storing the effective listen mode in a separate field (e.g., s.effectiveListenMode) rather than mutating the original settings. This preserves the user's intended configuration and makes debugging easier.

s.settings.P2P.ListenMode = settings.ListenModeListenOnly
} else {
s.logger.Infof("[Init] Asset HTTP address %s is reachable", s.AssetHTTPAddressURL)
}
}

return nil
}

// isAssetAddressReachable checks if the configured asset HTTP address is reachable.
// This performs a basic health check by attempting to fetch the genesis block from the asset server.
// It uses a timeout to prevent the initialization from hanging on unreachable endpoints.
//
// Returns true if the asset address is reachable and responds with a valid HTTP status code,
// false otherwise (network errors, timeouts, or server errors).
func (s *Server) isAssetAddressReachable(ctx context.Context, assetURL string) bool {
if assetURL == "" {
return false
}

// Get genesis hash for the health check
var genesisHash string
if s.settings != nil && s.settings.ChainCfgParams != nil && s.settings.ChainCfgParams.GenesisHash != nil {
genesisHash = s.settings.ChainCfgParams.GenesisHash.String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: Using a hardcoded default genesis hash for regtest is brittle and may fail silently if the settings are unexpectedly nil.

Suggestion: Consider failing fast with an error if s.settings.ChainCfgParams.GenesisHash is nil, rather than falling back to a hardcoded value. This makes configuration issues visible immediately.

} else {
// Default to regtest genesis if not available
genesisHash = "18e7664a7abf9bb0e96b889eaa3cb723a89a15b610cc40538e5ebe3e9222e8d2"
}

blockURL := assetURL + "/block/" + genesisHash

// Create request with timeout context (5 seconds should be sufficient for a local/public endpoint)
timeout := 5 * time.Second
checkCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

req, err := http.NewRequestWithContext(checkCtx, "GET", blockURL, nil)
if err != nil {
s.logger.Debugf("[Init] Failed to create health check request for %s: %v", assetURL, err)
return false
}

client := &http.Client{
Timeout: timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
// Allow redirects but limit to 3 to prevent loops
if len(via) >= 3 {
return http.ErrUseLastResponse
}
return nil
},
}

resp, err := client.Do(req)
if err != nil {
s.logger.Debugf("[Init] Asset address %s health check failed: %v", assetURL, err)
return false
}
defer resp.Body.Close()

s.logger.Debugf("[Init] Asset address %s health check responded with status %d", assetURL, resp.StatusCode)

// Check for offline indicators in 404 responses
if resp.StatusCode == 404 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Why limit to only the first 1024 bytes? If the response body is larger, the offline detection keywords might be missed if they appear later in the response.

Suggestion: Consider reading more (e.g., 8KB) or reading the entire response with a reasonable size limit to ensure reliable offline detection.

body, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
if err == nil {
bodyStr := strings.ToLower(string(body))
if strings.Contains(bodyStr, "offline") ||
strings.Contains(bodyStr, "tunnel not found") {
s.logger.Debugf("[Init] Asset address %s appears to be offline based on 404 response", assetURL)
return false
}
}
}

// Consider 2xx, 3xx, and 4xx (except offline 404s) as reachable
// This is consistent with peer health checking logic
return resp.StatusCode < 500
}

// Start begins the P2P server operations and starts listening for connections.
// This method is the main entry point for activating the P2P network functionality.
// It performs several key operations:
Expand Down
201 changes: 201 additions & 0 deletions services/p2p/Server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2190,6 +2190,207 @@ func TestServerInitHTTPPublicAddressEmpty(t *testing.T) {
require.Equal(t, "http://fallback.example.com", server.AssetHTTPAddressURL)
}

// TestServerInitAssetHealthCheck_Reachable tests that when the asset address is reachable,
// the server remains in full mode
func TestServerInitAssetHealthCheck_Reachable(t *testing.T) {
// Create test HTTP server that returns success
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}))
defer testServer.Close()

ctx := context.Background()
logger := ulogger.New("test-server")
mockClient := &blockchain.Mock{}

testSettings := createBaseTestSettings()
testSettings.P2P.PrivateKey = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdefabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
testSettings.Asset.HTTPPublicAddress = testServer.URL
testSettings.P2P.ListenMode = settings.ListenModeFull
testSettings.BlockChain.StoreURL = &url.URL{
Scheme: "sqlitememory",
}

server, err := NewServer(ctx, logger, testSettings, mockClient, nil, nil, nil, nil, nil, nil)
require.NoError(t, err)

err = server.Init(ctx)
require.NoError(t, err)

// Should remain in full mode since asset address is reachable
require.Equal(t, settings.ListenModeFull, server.settings.P2P.ListenMode)
require.Equal(t, testServer.URL, server.AssetHTTPAddressURL)
}

// TestServerInitAssetHealthCheck_Unreachable tests that when the asset address is unreachable,
// the server automatically switches to listen-only mode
func TestServerInitAssetHealthCheck_Unreachable(t *testing.T) {
ctx := context.Background()
logger := ulogger.New("test-server")
mockClient := &blockchain.Mock{}

testSettings := createBaseTestSettings()
testSettings.P2P.PrivateKey = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdefabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
testSettings.Asset.HTTPPublicAddress = "http://localhost:65535" // Unreachable port
testSettings.P2P.ListenMode = settings.ListenModeFull
testSettings.BlockChain.StoreURL = &url.URL{
Scheme: "sqlitememory",
}

server, err := NewServer(ctx, logger, testSettings, mockClient, nil, nil, nil, nil, nil, nil)
require.NoError(t, err)

err = server.Init(ctx)
require.NoError(t, err)

// Should automatically switch to listen-only mode
require.Equal(t, settings.ListenModeListenOnly, server.settings.P2P.ListenMode)
require.Equal(t, "http://localhost:65535", server.AssetHTTPAddressURL)
}

// TestServerInitAssetHealthCheck_ServerError tests that 5xx errors are treated as unreachable
func TestServerInitAssetHealthCheck_ServerError(t *testing.T) {
// Create test HTTP server that returns 500 error
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("Internal Server Error"))
}))
defer testServer.Close()

ctx := context.Background()
logger := ulogger.New("test-server")
mockClient := &blockchain.Mock{}

testSettings := createBaseTestSettings()
testSettings.P2P.PrivateKey = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdefabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
testSettings.Asset.HTTPPublicAddress = testServer.URL
testSettings.P2P.ListenMode = settings.ListenModeFull
testSettings.BlockChain.StoreURL = &url.URL{
Scheme: "sqlitememory",
}

server, err := NewServer(ctx, logger, testSettings, mockClient, nil, nil, nil, nil, nil, nil)
require.NoError(t, err)

err = server.Init(ctx)
require.NoError(t, err)

// Should automatically switch to listen-only mode due to 5xx error
require.Equal(t, settings.ListenModeListenOnly, server.settings.P2P.ListenMode)
}

// TestServerInitAssetHealthCheck_OfflineDetection tests that offline 404 responses are detected
func TestServerInitAssetHealthCheck_OfflineDetection(t *testing.T) {
// Create test HTTP server that returns offline indication
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte("Tunnel offline"))
}))
defer testServer.Close()

ctx := context.Background()
logger := ulogger.New("test-server")
mockClient := &blockchain.Mock{}

testSettings := createBaseTestSettings()
testSettings.P2P.PrivateKey = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdefabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
testSettings.Asset.HTTPPublicAddress = testServer.URL
testSettings.P2P.ListenMode = settings.ListenModeFull
testSettings.BlockChain.StoreURL = &url.URL{
Scheme: "sqlitememory",
}

server, err := NewServer(ctx, logger, testSettings, mockClient, nil, nil, nil, nil, nil, nil)
require.NoError(t, err)

err = server.Init(ctx)
require.NoError(t, err)

// Should automatically switch to listen-only mode due to offline detection
require.Equal(t, settings.ListenModeListenOnly, server.settings.P2P.ListenMode)
}

// TestServerInitAssetHealthCheck_Normal404 tests that normal 404s are still considered reachable
func TestServerInitAssetHealthCheck_Normal404(t *testing.T) {
// Create test HTTP server that returns normal 404
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte("Not Found"))
}))
defer testServer.Close()

ctx := context.Background()
logger := ulogger.New("test-server")
mockClient := &blockchain.Mock{}

testSettings := createBaseTestSettings()
testSettings.P2P.PrivateKey = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdefabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
testSettings.Asset.HTTPPublicAddress = testServer.URL
testSettings.P2P.ListenMode = settings.ListenModeFull
testSettings.BlockChain.StoreURL = &url.URL{
Scheme: "sqlitememory",
}

server, err := NewServer(ctx, logger, testSettings, mockClient, nil, nil, nil, nil, nil, nil)
require.NoError(t, err)

err = server.Init(ctx)
require.NoError(t, err)

// Should remain in full mode since normal 404 is considered reachable
require.Equal(t, settings.ListenModeFull, server.settings.P2P.ListenMode)
}

// TestServerInitAssetHealthCheck_ListenOnlyMode tests that listen-only mode nodes skip health check
func TestServerInitAssetHealthCheck_ListenOnlyMode(t *testing.T) {
ctx := context.Background()
logger := ulogger.New("test-server")
mockClient := &blockchain.Mock{}

testSettings := createBaseTestSettings()
testSettings.P2P.PrivateKey = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdefabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
testSettings.Asset.HTTPPublicAddress = "http://localhost:65535" // Unreachable, but shouldn't matter
testSettings.P2P.ListenMode = settings.ListenModeListenOnly // Already in listen-only mode
testSettings.BlockChain.StoreURL = &url.URL{
Scheme: "sqlitememory",
}

server, err := NewServer(ctx, logger, testSettings, mockClient, nil, nil, nil, nil, nil, nil)
require.NoError(t, err)

err = server.Init(ctx)
require.NoError(t, err)

// Should remain in listen-only mode (health check should be skipped)
require.Equal(t, settings.ListenModeListenOnly, server.settings.P2P.ListenMode)
}

// TestServerInitAssetHealthCheck_EmptyAddress tests that empty asset address switches to listen-only
func TestServerInitAssetHealthCheck_EmptyAddress(t *testing.T) {
ctx := context.Background()
logger := ulogger.New("test-server")
mockClient := &blockchain.Mock{}

testSettings := createBaseTestSettings()
testSettings.P2P.PrivateKey = "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdefabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
testSettings.Asset.HTTPPublicAddress = ""
testSettings.Asset.HTTPAddress = ""
testSettings.P2P.ListenMode = settings.ListenModeFull
testSettings.BlockChain.StoreURL = &url.URL{
Scheme: "sqlitememory",
}

server, err := NewServer(ctx, logger, testSettings, mockClient, nil, nil, nil, nil, nil, nil)
require.NoError(t, err)

err = server.Init(ctx)
require.NoError(t, err)

// Should automatically switch to listen-only mode since address is empty
require.Equal(t, settings.ListenModeListenOnly, server.settings.P2P.ListenMode)
}

func TestServerSetupHTTPServer(t *testing.T) {
ctx := context.Background()
logger := ulogger.New("test-logger")
Expand Down