Skip to content

Commit 5b92b16

Browse files
committed
more bug fixes
1 parent 347cd81 commit 5b92b16

9 files changed

Lines changed: 413 additions & 302 deletions

File tree

cmd/push-validator/cmd_balance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func handleBalance(d *Deps, args []string) error {
3939
convCancel()
4040
if convErr != nil {
4141
if flagOutput == "json" { d.Printer.JSON(map[string]any{"ok": false, "error": convErr.Error(), "address": addr}) } else { d.Printer.Error(fmt.Sprintf("address conversion error: %v", convErr)) }
42-
return convErr
42+
return silentErr{convErr}
4343
}
4444
addr = bech32Addr
4545
}

cmd/push-validator/cmd_logs.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func handleLogsCore(sup process.Supervisor, deps logDeps) error {
9191

9292
return deps.runLogUI(ctx, ui.LogUIOptions{
9393
LogPath: lp,
94-
BgKey: 'b',
9594
ShowFooter: interactive,
9695
NoColor: flagNoColor,
9796
})

cmd/push-validator/cmd_start.go

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,39 @@ var startCmd = &cobra.Command{
101101
}
102102
}
103103

104+
// If node is initialized but data is empty (e.g., post-reset), restore snapshot
105+
if !needsInit && !snapshot.IsSnapshotPresent(cfg.HomeDir) {
106+
if flagOutput != "json" {
107+
p.Info("No blockchain data found — restoring from snapshot...")
108+
fmt.Println()
109+
}
110+
snapshotSvc := snapshot.New()
111+
if err := snapshotSvc.Download(cmd.Context(), snapshot.Options{
112+
SnapshotURL: cfg.SnapshotURL,
113+
HomeDir: cfg.HomeDir,
114+
Progress: createSnapshotProgressCallback(flagOutput),
115+
}); err != nil {
116+
return fmt.Errorf("download snapshot: %w", err)
117+
}
118+
if err := snapshotSvc.Extract(cmd.Context(), snapshot.ExtractOptions{
119+
HomeDir: cfg.HomeDir,
120+
TargetDir: filepath.Join(cfg.HomeDir, "data"),
121+
Progress: createSnapshotProgressCallback(flagOutput),
122+
}); err != nil {
123+
return fmt.Errorf("extract snapshot: %w", err)
124+
}
125+
// Ensure priv_validator_state.json exists after extraction
126+
pvsPath := filepath.Join(cfg.HomeDir, "data", "priv_validator_state.json")
127+
if _, err := os.Stat(pvsPath); os.IsNotExist(err) {
128+
_ = os.WriteFile(pvsPath, []byte(`{"height":"0","round":0,"step":0}` + "\n"), 0o644)
129+
}
130+
if flagOutput != "json" {
131+
fmt.Println()
132+
p.Success("Snapshot restored")
133+
fmt.Println()
134+
}
135+
}
136+
104137
// Verify cosmovisor is available
105138
detection := cosmovisor.Detect(cfg.HomeDir)
106139
if !detection.Available {
@@ -226,10 +259,20 @@ func handlePostStartFlow(cfg config.Config, p *ui.Printer) bool {
226259
return fmt.Errorf("reset failed: %w", err)
227260
}
228261

229-
// Recreate priv_validator_state.json
230-
pvs := filepath.Join(cfg.HomeDir, "data", "priv_validator_state.json")
231-
if err := os.WriteFile(pvs, []byte("{\n \"height\": \"0\",\n \"round\": 0,\n \"step\": 0\n}\n"), 0o644); err != nil {
232-
return fmt.Errorf("failed to create priv_validator_state.json: %w", err)
262+
// Restore snapshot before restarting (node cannot start from genesis)
263+
fmt.Println(p.Colors.Info(" Restoring snapshot..."))
264+
snapshotSvc := snapshot.New()
265+
if err := snapshotSvc.Download(context.Background(), snapshot.Options{
266+
SnapshotURL: cfg.SnapshotURL,
267+
HomeDir: cfg.HomeDir,
268+
}); err != nil {
269+
return fmt.Errorf("snapshot download failed: %w", err)
270+
}
271+
if err := snapshotSvc.Extract(context.Background(), snapshot.ExtractOptions{
272+
HomeDir: cfg.HomeDir,
273+
TargetDir: filepath.Join(cfg.HomeDir, "data"),
274+
}); err != nil {
275+
return fmt.Errorf("snapshot extract failed: %w", err)
233276
}
234277

235278
fmt.Println(p.Colors.Info(" Restarting node..."))
@@ -245,9 +288,6 @@ func handlePostStartFlow(cfg config.Config, p *ui.Printer) bool {
245288
return nil
246289
}
247290

248-
// Note: With snapshot download, we don't need state sync reconfigure.
249-
// The node already has data from the snapshot and just needs to catch up via block sync.
250-
251291
if err := syncmon.RunWithRetry(context.Background(), syncmon.RetryOptions{
252292
Options: syncmon.Options{
253293
LocalRPC: "http://127.0.0.1:26657",
@@ -432,9 +472,9 @@ func checkValidatorRegistration(v validator.Service, maxRetries int) valCheckRes
432472
type postStartAction string
433473

434474
const (
435-
actionShowDashboard postStartAction = "show_dashboard"
436-
actionPromptRegister postStartAction = "prompt_register"
437-
actionShowSteps postStartAction = "show_steps"
475+
actionShowDashboard postStartAction = "show_dashboard"
476+
actionPromptRegister postStartAction = "prompt_register"
477+
actionShowSteps postStartAction = "show_steps"
438478
)
439479

440480
// computePostStartDecision determines what to do based on validator check results and interactivity.

internal/bootstrap/bootstrap.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,15 @@ func (s *svc) Init(ctx context.Context, opts Options) error {
197197
return fmt.Errorf("download snapshot: %w", err)
198198
}
199199

200+
progress("Extracting snapshot...")
201+
if err := s.snapshot.Extract(ctx, snapshot.ExtractOptions{
202+
HomeDir: opts.HomeDir,
203+
TargetDir: filepath.Join(opts.HomeDir, "data"),
204+
Progress: opts.SnapshotProgress,
205+
}); err != nil {
206+
return fmt.Errorf("extract snapshot: %w", err)
207+
}
208+
200209
// Mark successful snapshot download
201210
_ = os.WriteFile(filepath.Join(opts.HomeDir, ".snapshot_downloaded"), []byte(time.Now().Format(time.RFC3339)), 0o644)
202211

internal/node/ws.go

Lines changed: 118 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,107 +1,136 @@
11
package node
22

33
import (
4-
"context"
5-
"encoding/json"
6-
"fmt"
7-
"net/url"
8-
"time"
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/url"
8+
"time"
99

10-
"github.com/gorilla/websocket"
10+
"github.com/gorilla/websocket"
1111
)
1212

1313
// DialAndSubscribeHeaders uses gorilla/websocket to subscribe to NewBlockHeader events and stream heights.
1414
func DialAndSubscribeHeaders(ctx context.Context, wsURL string) (<-chan Header, error) {
15-
u, err := url.Parse(wsURL)
16-
if err != nil { return nil, err }
17-
if u.Path == "" { u.Path = "/websocket" }
15+
u, err := url.Parse(wsURL)
16+
if err != nil {
17+
return nil, err
18+
}
19+
if u.Path == "" {
20+
u.Path = "/websocket"
21+
}
1822

19-
d := websocket.Dialer{
20-
Subprotocols: []string{"jsonrpc"},
21-
HandshakeTimeout: 5 * time.Second,
22-
EnableCompression: false,
23-
}
24-
// nolint:bodyclose
25-
conn, _, err := d.DialContext(ctx, u.String(), map[string][]string{"Origin": {"http://localhost"}})
26-
if err != nil { return nil, err }
23+
d := websocket.Dialer{
24+
Subprotocols: []string{"jsonrpc"},
25+
HandshakeTimeout: 5 * time.Second,
26+
EnableCompression: false,
27+
}
28+
// nolint:bodyclose
29+
conn, _, err := d.DialContext(ctx, u.String(), map[string][]string{"Origin": {"http://localhost"}})
30+
if err != nil {
31+
return nil, err
32+
}
2733

28-
// Send subscribe request
29-
sub := map[string]any{
30-
"jsonrpc": "2.0",
31-
"method": "subscribe",
32-
// Prefer cometbft.event key for 0.38+; servers typically support both.
33-
"params": map[string]string{"query": "cometbft.event='NewBlockHeader'"},
34-
"id": 1,
35-
}
36-
if err := conn.WriteJSON(sub); err != nil { _ = conn.Close(); return nil, err }
34+
// Send subscribe request
35+
sub := map[string]any{
36+
"jsonrpc": "2.0",
37+
"method": "subscribe",
38+
// Prefer cometbft.event key for 0.38+; servers typically support both.
39+
"params": map[string]string{"query": "cometbft.event='NewBlockHeader'"},
40+
"id": 1,
41+
}
42+
if err := conn.WriteJSON(sub); err != nil {
43+
_ = conn.Close()
44+
return nil, err
45+
}
3746

38-
out := make(chan Header, 32)
39-
go func() {
40-
defer close(out)
41-
defer func() {
42-
// attempt proper close handshake
43-
deadline := time.Now().Add(1500 * time.Millisecond)
44-
_ = conn.SetWriteDeadline(deadline)
45-
_ = conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline)
46-
// best-effort wait for server close
47-
_ = conn.SetReadDeadline(deadline)
48-
_, _, _ = conn.ReadMessage()
49-
_ = conn.Close()
50-
}()
51-
for {
52-
select {
53-
case <-ctx.Done():
54-
return
55-
default:
56-
}
57-
// Read next message
58-
// Read next message (gorilla handles ping/pong)
59-
_, msg, err := conn.ReadMessage()
60-
if err != nil {
61-
// graceful exits on normal closure or going away
62-
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
63-
return
64-
}
65-
// any other error: return and let defer handle close
66-
return
67-
}
68-
if h, ok := parseHeaderHeight(msg); ok { out <- h; continue }
69-
// handle pong/ping implicitly via gorilla; ignore others
70-
}
71-
}()
72-
return out, nil
47+
// Read deadline is a safety net for TCP connections that don't close properly.
48+
// During block sync, WS silence for minutes is normal (events only at consensus tip).
49+
const readTimeout = 5 * time.Minute
50+
51+
out := make(chan Header, 32)
52+
go func() {
53+
defer close(out)
54+
defer func() {
55+
// attempt proper close handshake
56+
deadline := time.Now().Add(1500 * time.Millisecond)
57+
_ = conn.SetWriteDeadline(deadline)
58+
_ = conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline)
59+
// best-effort wait for server close
60+
_ = conn.SetReadDeadline(deadline)
61+
_, _, _ = conn.ReadMessage()
62+
_ = conn.Close()
63+
}()
64+
for {
65+
select {
66+
case <-ctx.Done():
67+
return
68+
default:
69+
}
70+
// Set read deadline before each read; reset on success
71+
_ = conn.SetReadDeadline(time.Now().Add(readTimeout))
72+
_, msg, err := conn.ReadMessage()
73+
if err != nil {
74+
// graceful exits on normal closure or going away
75+
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
76+
return
77+
}
78+
// any other error (timeout, connection reset): exit
79+
return
80+
}
81+
if h, ok := parseHeaderHeight(msg); ok {
82+
out <- h
83+
continue
84+
}
85+
// handle pong/ping implicitly via gorilla; ignore others
86+
}
87+
}()
88+
return out, nil
7389
}
7490

7591
func parseHeaderHeight(b []byte) (Header, bool) {
76-
var payload struct {
77-
Result struct {
78-
Data struct {
79-
Value struct {
80-
Header struct {
81-
Height string `json:"height"`
82-
Time time.Time `json:"time"`
83-
} `json:"header"`
84-
} `json:"value"`
85-
} `json:"data"`
86-
} `json:"result"`
87-
}
88-
if err := json.Unmarshal(b, &payload); err != nil { return Header{}, false }
89-
if payload.Result.Data.Value.Header.Height == "" { return Header{}, false }
90-
// Accept both tm.event and cometbft.event streams; height parse only
91-
h, err := strconvParseInt(payload.Result.Data.Value.Header.Height)
92-
if err != nil { return Header{}, false }
93-
return Header{Height: h, Time: payload.Result.Data.Value.Header.Time}, true
92+
var payload struct {
93+
Result struct {
94+
Data struct {
95+
Value struct {
96+
Header struct {
97+
Height string `json:"height"`
98+
Time time.Time `json:"time"`
99+
} `json:"header"`
100+
} `json:"value"`
101+
} `json:"data"`
102+
} `json:"result"`
103+
}
104+
if err := json.Unmarshal(b, &payload); err != nil {
105+
return Header{}, false
106+
}
107+
if payload.Result.Data.Value.Header.Height == "" {
108+
return Header{}, false
109+
}
110+
// Accept both tm.event and cometbft.event streams; height parse only
111+
h, err := strconvParseInt(payload.Result.Data.Value.Header.Height)
112+
if err != nil {
113+
return Header{}, false
114+
}
115+
return Header{Height: h, Time: payload.Result.Data.Value.Header.Time}, true
94116
}
95117

96118
func strconvParseInt(s string) (int64, error) {
97-
var n int64
98-
var sign int64 = 1
99-
if s == "" { return 0, fmt.Errorf("empty") }
100-
if s[0] == '-' { sign = -1; s = s[1:] }
101-
for i := 0; i < len(s); i++ {
102-
c := s[i]
103-
if c < '0' || c > '9' { return 0, fmt.Errorf("invalid") }
104-
n = n*10 + int64(c-'0')
105-
}
106-
return sign * n, nil
119+
var n int64
120+
var sign int64 = 1
121+
if s == "" {
122+
return 0, fmt.Errorf("empty")
123+
}
124+
if s[0] == '-' {
125+
sign = -1
126+
s = s[1:]
127+
}
128+
for i := 0; i < len(s); i++ {
129+
c := s[i]
130+
if c < '0' || c > '9' {
131+
return 0, fmt.Errorf("invalid")
132+
}
133+
n = n*10 + int64(c-'0')
134+
}
135+
return sign * n, nil
107136
}

0 commit comments

Comments
 (0)