From d3b150cf63c3feaa21a4e047e1085804934e0b91 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Thu, 28 Nov 2024 10:30:15 +0530 Subject: [PATCH 01/19] add netwok tag to metrics --- metrics/metrics.go | 148 +++++++++++++++++++++++++++++++++------------ 1 file changed, 109 insertions(+), 39 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index c47642dc4a2..71047cdbefd 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -30,6 +30,7 @@ var ( Version, _ = tag.NewKey("version") Commit, _ = tag.NewKey("commit") NodeType, _ = tag.NewKey("node_type") + Network, _ = tag.NewKey("network") PeerID, _ = tag.NewKey("peer_id") MinerID, _ = tag.NewKey("miner_id") FailureType, _ = tag.NewKey("failure_type") @@ -192,40 +193,46 @@ var ( Description: "Lotus node information", Measure: LotusInfo, Aggregation: view.LastValue(), - TagKeys: []tag.Key{Version, Commit, NodeType}, + TagKeys: []tag.Key{Version, Commit, NodeType, Network}, } ChainNodeHeightView = &view.View{ Measure: ChainNodeHeight, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } ChainNodeHeightExpectedView = &view.View{ Measure: ChainNodeHeightExpected, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } ChainNodeWorkerHeightView = &view.View{ Measure: ChainNodeWorkerHeight, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } BlockReceivedView = &view.View{ Measure: BlockReceived, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } BlockValidationFailureView = &view.View{ Measure: BlockValidationFailure, Aggregation: view.Count(), - TagKeys: []tag.Key{FailureType}, + TagKeys: []tag.Key{FailureType, Network}, } BlockValidationSuccessView = &view.View{ Measure: BlockValidationSuccess, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } BlockValidationDurationView = &view.View{ Measure: BlockValidationDurationMilliseconds, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } BlockDelayView = &view.View{ Measure: BlockDelay, - TagKeys: []tag.Key{MinerID}, + TagKeys: []tag.Key{MinerID, Network}, Aggregation: func() *view.Aggregation { var bounds []float64 for i := 5; i < 29; i++ { // 5-29s, step 1s @@ -244,398 +251,461 @@ var ( IndexerMessageValidationFailureView = &view.View{ Measure: IndexerMessageValidationFailure, Aggregation: view.Count(), - TagKeys: []tag.Key{FailureType, Local}, + TagKeys: []tag.Key{FailureType, Local, Network}, } IndexerMessageValidationSuccessView = &view.View{ Measure: IndexerMessageValidationSuccess, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } MessagePublishedView = &view.View{ Measure: MessagePublished, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } MessageReceivedView = &view.View{ Measure: MessageReceived, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } MessageValidationFailureView = &view.View{ Measure: MessageValidationFailure, Aggregation: view.Count(), - TagKeys: []tag.Key{FailureType, Local}, + TagKeys: []tag.Key{FailureType, Local, Network}, } MessageValidationSuccessView = &view.View{ Measure: MessageValidationSuccess, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } MessageValidationDurationView = &view.View{ Measure: MessageValidationDuration, Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{MsgValid, Local}, + TagKeys: []tag.Key{MsgValid, Local, Network}, } MpoolGetNonceDurationView = &view.View{ Measure: MpoolGetNonceDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } MpoolGetBalanceDurationView = &view.View{ Measure: MpoolGetBalanceDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } MpoolAddTsDurationView = &view.View{ Measure: MpoolAddTsDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } MpoolAddDurationView = &view.View{ Measure: MpoolAddDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } MpoolPushDurationView = &view.View{ Measure: MpoolPushDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } MpoolMessageCountView = &view.View{ Measure: MpoolMessageCount, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } PeerCountView = &view.View{ Measure: PeerCount, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } PubsubPublishMessageView = &view.View{ Measure: PubsubPublishMessage, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } PubsubDeliverMessageView = &view.View{ Measure: PubsubDeliverMessage, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } PubsubRejectMessageView = &view.View{ Measure: PubsubRejectMessage, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } PubsubDuplicateMessageView = &view.View{ Measure: PubsubDuplicateMessage, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } PubsubPruneMessageView = &view.View{ Measure: PubsubPruneMessage, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } PubsubRecvRPCView = &view.View{ Measure: PubsubRecvRPC, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } PubsubSendRPCView = &view.View{ Measure: PubsubSendRPC, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } PubsubDropRPCView = &view.View{ Measure: PubsubDropRPC, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } APIRequestDurationView = &view.View{ Measure: APIRequestDuration, Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{APIInterface, Endpoint}, + TagKeys: []tag.Key{APIInterface, Endpoint, Network}, } VMFlushCopyDurationView = &view.View{ Measure: VMFlushCopyDuration, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } VMFlushCopyCountView = &view.View{ Measure: VMFlushCopyCount, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } VMApplyBlocksTotalView = &view.View{ Measure: VMApplyBlocksTotal, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } VMApplyMessagesView = &view.View{ Measure: VMApplyMessages, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } VMApplyEarlyView = &view.View{ Measure: VMApplyEarly, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } VMApplyCronView = &view.View{ Measure: VMApplyCron, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } VMApplyFlushView = &view.View{ Measure: VMApplyFlush, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } VMSendsView = &view.View{ Measure: VMSends, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } VMAppliedView = &view.View{ Measure: VMApplied, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } VMExecutionWaitingView = &view.View{ Measure: VMExecutionWaiting, Aggregation: view.Sum(), - TagKeys: []tag.Key{ExecutionLane}, + TagKeys: []tag.Key{ExecutionLane, Network}, } VMExecutionRunningView = &view.View{ Measure: VMExecutionRunning, Aggregation: view.Sum(), - TagKeys: []tag.Key{ExecutionLane}, + TagKeys: []tag.Key{ExecutionLane, Network}, } // miner WorkerCallsStartedView = &view.View{ Measure: WorkerCallsStarted, Aggregation: view.Count(), - TagKeys: []tag.Key{TaskType, WorkerHostname}, + TagKeys: []tag.Key{TaskType, WorkerHostname, Network}, } WorkerCallsReturnedCountView = &view.View{ Measure: WorkerCallsReturnedCount, Aggregation: view.Count(), - TagKeys: []tag.Key{TaskType, WorkerHostname}, + + TagKeys: []tag.Key{TaskType, WorkerHostname, Network}, } WorkerUntrackedCallsReturnedView = &view.View{ - Measure: WorkerUntrackedCallsReturned, + Measure: WorkerUntrackedCallsReturned, + Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } WorkerCallsReturnedDurationView = &view.View{ Measure: WorkerCallsReturnedDuration, Aggregation: workMillisecondsDistribution, - TagKeys: []tag.Key{TaskType, WorkerHostname}, + TagKeys: []tag.Key{TaskType, WorkerHostname, Network}, } SectorStatesView = &view.View{ Measure: SectorStates, Aggregation: view.LastValue(), - TagKeys: []tag.Key{SectorState}, + TagKeys: []tag.Key{SectorState, Network}, } StorageFSAvailableView = &view.View{ Measure: StorageFSAvailable, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageAvailableView = &view.View{ Measure: StorageAvailable, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageReservedView = &view.View{ Measure: StorageReserved, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageLimitUsedView = &view.View{ Measure: StorageLimitUsed, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageCapacityBytesView = &view.View{ Measure: StorageCapacityBytes, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageFSAvailableBytesView = &view.View{ Measure: StorageFSAvailableBytes, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageAvailableBytesView = &view.View{ Measure: StorageAvailableBytes, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageReservedBytesView = &view.View{ Measure: StorageReservedBytes, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageLimitUsedBytesView = &view.View{ Measure: StorageLimitUsedBytes, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } StorageLimitMaxBytesView = &view.View{ Measure: StorageLimitMaxBytes, Aggregation: view.LastValue(), - TagKeys: []tag.Key{StorageID, PathStorage, PathSeal}, + TagKeys: []tag.Key{StorageID, PathStorage, PathSeal, Network}, } SchedAssignerCycleDurationView = &view.View{ Measure: SchedAssignerCycleDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } SchedAssignerCandidatesDurationView = &view.View{ Measure: SchedAssignerCandidatesDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } SchedAssignerWindowSelectionDurationView = &view.View{ Measure: SchedAssignerWindowSelectionDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } SchedAssignerSubmitDurationView = &view.View{ Measure: SchedAssignerSubmitDuration, Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{Network}, } SchedCycleOpenWindowsView = &view.View{ Measure: SchedCycleOpenWindows, Aggregation: queueSizeDistribution, + TagKeys: []tag.Key{Network}, } SchedCycleQueueSizeView = &view.View{ Measure: SchedCycleQueueSize, Aggregation: queueSizeDistribution, + TagKeys: []tag.Key{Network}, } DagStorePRInitCountView = &view.View{ Measure: DagStorePRInitCount, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } DagStorePRBytesRequestedView = &view.View{ Measure: DagStorePRBytesRequested, Aggregation: view.Sum(), - TagKeys: []tag.Key{PRReadType}, + TagKeys: []tag.Key{PRReadType, Network}, } DagStorePRBytesDiscardedView = &view.View{ Measure: DagStorePRBytesDiscarded, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } DagStorePRDiscardCountView = &view.View{ Measure: DagStorePRDiscardCount, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } DagStorePRSeekBackCountView = &view.View{ Measure: DagStorePRSeekBackCount, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } DagStorePRSeekForwardCountView = &view.View{ Measure: DagStorePRSeekForwardCount, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } DagStorePRSeekBackBytesView = &view.View{ Measure: DagStorePRSeekBackBytes, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } DagStorePRSeekForwardBytesView = &view.View{ Measure: DagStorePRSeekForwardBytes, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } DagStorePRAtHitBytesView = &view.View{ Measure: DagStorePRAtHitBytes, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } DagStorePRAtHitCountView = &view.View{ Measure: DagStorePRAtHitCount, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } DagStorePRAtCacheFillCountView = &view.View{ Measure: DagStorePRAtCacheFillCount, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } DagStorePRAtReadBytesView = &view.View{ Measure: DagStorePRAtReadBytes, Aggregation: view.Sum(), - TagKeys: []tag.Key{PRReadSize}, + TagKeys: []tag.Key{PRReadSize, Network}, } DagStorePRAtReadCountView = &view.View{ Measure: DagStorePRAtReadCount, Aggregation: view.Count(), - TagKeys: []tag.Key{PRReadSize}, + TagKeys: []tag.Key{PRReadSize, Network}, } // splitstore SplitstoreMissView = &view.View{ Measure: SplitstoreMiss, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } SplitstoreCompactionTimeSecondsView = &view.View{ Measure: SplitstoreCompactionTimeSeconds, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } SplitstoreCompactionHotView = &view.View{ Measure: SplitstoreCompactionHot, Aggregation: view.LastValue(), + TagKeys: []tag.Key{Network}, } SplitstoreCompactionColdView = &view.View{ Measure: SplitstoreCompactionCold, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } SplitstoreCompactionDeadView = &view.View{ Measure: SplitstoreCompactionDead, Aggregation: view.Sum(), + TagKeys: []tag.Key{Network}, } // rcmgr RcmgrAllowConnView = &view.View{ Measure: RcmgrAllowConn, Aggregation: view.Count(), - TagKeys: []tag.Key{Direction, UseFD}, + TagKeys: []tag.Key{Direction, UseFD, Network}, } RcmgrBlockConnView = &view.View{ Measure: RcmgrBlockConn, Aggregation: view.Count(), - TagKeys: []tag.Key{Direction, UseFD}, + TagKeys: []tag.Key{Direction, UseFD, Network}, } RcmgrAllowStreamView = &view.View{ Measure: RcmgrAllowStream, Aggregation: view.Count(), - TagKeys: []tag.Key{PeerID, Direction}, + TagKeys: []tag.Key{PeerID, Direction, Network}, } RcmgrBlockStreamView = &view.View{ Measure: RcmgrBlockStream, Aggregation: view.Count(), - TagKeys: []tag.Key{PeerID, Direction}, + TagKeys: []tag.Key{PeerID, Direction, Network}, } RcmgrAllowPeerView = &view.View{ Measure: RcmgrAllowPeer, Aggregation: view.Count(), - TagKeys: []tag.Key{PeerID}, + TagKeys: []tag.Key{PeerID, Network}, } RcmgrBlockPeerView = &view.View{ Measure: RcmgrBlockPeer, Aggregation: view.Count(), - TagKeys: []tag.Key{PeerID}, + + TagKeys: []tag.Key{PeerID, Network}, } RcmgrAllowProtoView = &view.View{ Measure: RcmgrAllowProto, Aggregation: view.Count(), - TagKeys: []tag.Key{ProtocolID}, + + TagKeys: []tag.Key{ProtocolID, Network}, } RcmgrBlockProtoView = &view.View{ Measure: RcmgrBlockProto, Aggregation: view.Count(), - TagKeys: []tag.Key{ProtocolID}, + + TagKeys: []tag.Key{ProtocolID, Network}, } RcmgrBlockProtoPeerView = &view.View{ Measure: RcmgrBlockProtoPeer, Aggregation: view.Count(), - TagKeys: []tag.Key{ProtocolID, PeerID}, + + TagKeys: []tag.Key{ProtocolID, PeerID, Network}, } RcmgrAllowSvcView = &view.View{ Measure: RcmgrAllowSvc, Aggregation: view.Count(), - TagKeys: []tag.Key{ServiceID}, + + TagKeys: []tag.Key{ServiceID, Network}, } RcmgrBlockSvcView = &view.View{ Measure: RcmgrBlockSvc, Aggregation: view.Count(), - TagKeys: []tag.Key{ServiceID}, + + TagKeys: []tag.Key{ServiceID, Network}, } RcmgrBlockSvcPeerView = &view.View{ Measure: RcmgrBlockSvcPeer, Aggregation: view.Count(), - TagKeys: []tag.Key{ServiceID, PeerID}, + + TagKeys: []tag.Key{ServiceID, PeerID, Network}, } RcmgrAllowMemView = &view.View{ Measure: RcmgrAllowMem, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } RcmgrBlockMemView = &view.View{ Measure: RcmgrBlockMem, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } RateLimitedView = &view.View{ Measure: RateLimitCount, Aggregation: view.Count(), + TagKeys: []tag.Key{Network}, } ) From 747f08782ccc6528a38f67a2dd80ae3f92c7a53a Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Fri, 29 Nov 2024 12:28:18 +0530 Subject: [PATCH 02/19] Enhance metrics by adding network tag to daemon and miner commands --- cmd/lotus-miner/run.go | 4 ++++ cmd/lotus/daemon.go | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-miner/run.go b/cmd/lotus-miner/run.go index e09968165b9..1fea55b6b66 100644 --- a/cmd/lotus-miner/run.go +++ b/cmd/lotus-miner/run.go @@ -4,6 +4,7 @@ import ( "fmt" _ "net/http/pprof" "os" + "strings" "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" @@ -16,6 +17,7 @@ import ( "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/build/buildconstants" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" @@ -55,10 +57,12 @@ var runCmd = &cli.Command{ } } + network := strings.Split(string(buildconstants.BuildTypeString()), "+")[1] ctx, _ := tag.New(lcli.DaemonContext(cctx), tag.Insert(metrics.Version, build.MinerBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), tag.Insert(metrics.NodeType, "miner"), + tag.Insert(metrics.Network, network), ) // Register all metric views if err := view.Register( diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index b7fbd63e695..629fc58364b 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -207,11 +207,12 @@ var DaemonCmd = &cli.Command{ default: return fmt.Errorf("unrecognized profile type: %q", profile) } - + network := strings.Split(string(buildconstants.BuildTypeString()), "+")[1] ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.NodeBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), tag.Insert(metrics.NodeType, "chain"), + tag.Insert(metrics.Network, network), ) // Register all metric views if err = view.Register( From e420b877bdb19f60d6a157c98a0fbfbc6d6052f3 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Fri, 29 Nov 2024 12:29:54 +0530 Subject: [PATCH 03/19] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5db5a4b1977..272b8c68e87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ - The miner actor builtin `QAPowerForWeight` no longer accepts the unused "dealWeight" parameter, the function signature now only takes 3 arguments: sectorSize, sectorDuration, verifiedWeight. ([filecoin-project/lotus#12445](https://github.com/filecoin-project/lotus/pull/12445)) +- Lotus now reports the network name in the metrics. ([filecoin-project/lotus#12733](https://github.com/filecoin-project/lotus/pull/12733)) + ## Bug Fixes - Make `EthTraceFilter` / `trace_filter` skip null rounds instead of erroring. ([filecoin-project/lotus#12702](https://github.com/filecoin-project/lotus/pull/12702)) From 83a2bca12274cbe68b07845a18b26df4b2c57909 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Fri, 29 Nov 2024 12:40:09 +0530 Subject: [PATCH 04/19] minor fix --- cmd/lotus-miner/run.go | 2 +- cmd/lotus/daemon.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/lotus-miner/run.go b/cmd/lotus-miner/run.go index 1fea55b6b66..329eb9a5068 100644 --- a/cmd/lotus-miner/run.go +++ b/cmd/lotus-miner/run.go @@ -57,7 +57,7 @@ var runCmd = &cli.Command{ } } - network := strings.Split(string(buildconstants.BuildTypeString()), "+")[1] + network := strings.Split(buildconstants.BuildTypeString(), "+")[1] ctx, _ := tag.New(lcli.DaemonContext(cctx), tag.Insert(metrics.Version, build.MinerBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 629fc58364b..8cce241a5cf 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -207,7 +207,7 @@ var DaemonCmd = &cli.Command{ default: return fmt.Errorf("unrecognized profile type: %q", profile) } - network := strings.Split(string(buildconstants.BuildTypeString()), "+")[1] + network := strings.Split(buildconstants.BuildTypeString(), "+")[1] ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.NodeBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), From a80b86bacb36e74e87384230935f135f71c18ce8 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Wed, 4 Dec 2024 17:08:11 +0530 Subject: [PATCH 05/19] Update cmd/lotus-miner/run.go Co-authored-by: Rod Vagg --- cmd/lotus-miner/run.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/lotus-miner/run.go b/cmd/lotus-miner/run.go index 329eb9a5068..a0c77fac99f 100644 --- a/cmd/lotus-miner/run.go +++ b/cmd/lotus-miner/run.go @@ -57,12 +57,11 @@ var runCmd = &cli.Command{ } } - network := strings.Split(buildconstants.BuildTypeString(), "+")[1] ctx, _ := tag.New(lcli.DaemonContext(cctx), tag.Insert(metrics.Version, build.MinerBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), tag.Insert(metrics.NodeType, "miner"), - tag.Insert(metrics.Network, network), + tag.Insert(metrics.Network, buildconstants.NetworkBundle), ) // Register all metric views if err := view.Register( From 1ab14eae2c03303ab2210bcadc44c98275c70cc9 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Wed, 4 Dec 2024 17:08:19 +0530 Subject: [PATCH 06/19] Update cmd/lotus/daemon.go Co-authored-by: Rod Vagg --- cmd/lotus/daemon.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 8cce241a5cf..547765fa816 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -207,12 +207,11 @@ var DaemonCmd = &cli.Command{ default: return fmt.Errorf("unrecognized profile type: %q", profile) } - network := strings.Split(buildconstants.BuildTypeString(), "+")[1] ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.NodeBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), tag.Insert(metrics.NodeType, "chain"), - tag.Insert(metrics.Network, network), + tag.Insert(metrics.Network, buildconstants.NetworkBundle), ) // Register all metric views if err = view.Register( From 308036887224fc6759ba29b683bd2c413f537881 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Tue, 10 Dec 2024 07:46:21 +0530 Subject: [PATCH 07/19] chore: remove unused import --- cmd/lotus-miner/run.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/lotus-miner/run.go b/cmd/lotus-miner/run.go index a0c77fac99f..ea50981e645 100644 --- a/cmd/lotus-miner/run.go +++ b/cmd/lotus-miner/run.go @@ -4,7 +4,6 @@ import ( "fmt" _ "net/http/pprof" "os" - "strings" "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" From be5d79a88744f9482765a344d103eeb675cf3b57 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Wed, 11 Dec 2024 15:29:47 +0530 Subject: [PATCH 08/19] feat(metrics): add network tagging to context across multiple files --- blockstore/splitstore/splitstore.go | 10 +++++++++- node/modules/lp2p/rcmgr.go | 20 ++++++++++++++++---- node/rpc.go | 5 ++++- paychmgr/manager.go | 13 ++++++++++++- storage/paths/db_index.go | 14 ++++++++++++++ storage/wdpost/wdpost_changehandler.go | 11 +++++++++++ 6 files changed, 66 insertions(+), 7 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 7f5049caf04..e8418eb97ec 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -14,12 +14,14 @@ import ( ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "go.opencensus.io/stats" + "go.opencensus.io/tag" "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" bstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" @@ -266,7 +268,13 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss.txnViewsCond.L = &ss.txnViewsMx ss.txnSyncCond.L = &ss.txnSyncMx ss.chainSyncCond.L = &ss.chainSyncMx - ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + baseCtx := context.Background() + ctx, err := tag.New(baseCtx, tag.Insert(metrics.Network, buildconstants.NetworkBundle)) + if err != nil { + return nil, xerrors.Errorf("failed to create context with network tag: %w", err) + } + ss.ctx, ss.cancel = context.WithCancel(ctx) ss.reifyCond.L = &ss.reifyMx ss.reifyPend = make(map[cid.Cid]struct{}) diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 75a09068cc7..1d4a277b388 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -9,6 +9,10 @@ import ( "path/filepath" "sync" + "github.com/filecoin-project/lotus/build/buildconstants" + "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/network" @@ -21,10 +25,6 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/fx" - - "github.com/filecoin-project/lotus/metrics" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" ) var rcmgrMetricsOnce sync.Once @@ -179,6 +179,7 @@ type rcmgrMetrics struct{} func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -194,6 +195,7 @@ func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -209,6 +211,7 @@ func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -219,6 +222,7 @@ func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -229,46 +233,54 @@ func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { func (r rcmgrMetrics) AllowPeer(p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) stats.Record(ctx, metrics.RcmgrAllowPeer.M(1)) } func (r rcmgrMetrics) BlockPeer(p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) stats.Record(ctx, metrics.RcmgrBlockPeer.M(1)) } func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrAllowProto.M(1)) } func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrBlockProto.M(1)) } func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1)) } func (r rcmgrMetrics) AllowService(svc string) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrAllowSvc.M(1)) } func (r rcmgrMetrics) BlockService(svc string) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrBlockSvc.M(1)) } func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1)) } diff --git a/node/rpc.go b/node/rpc.go index ede1b924cd4..c9af4de3774 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" @@ -48,7 +49,9 @@ func ServeRPC(h http.Handler, id string, addr multiaddr.Multiaddr) (StopFunc, er Handler: h, ReadHeaderTimeout: 30 * time.Second, BaseContext: func(listener net.Listener) context.Context { - ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, id)) + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.APIInterface, id)) return ctx }, } diff --git a/paychmgr/manager.go b/paychmgr/manager.go index 97073801272..482c2c0b7f7 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -20,6 +20,10 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/lotus/build/buildconstants" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/tag" ) var log = logging.Logger("paych") @@ -70,6 +74,10 @@ type Manager struct { func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, pchstore *Store, api PaychAPI) *Manager { impl := &managerAPIImpl{StateManagerAPI: sm, PaychAPI: api} + + // Add network tag to context + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + return &Manager{ ctx: ctx, shutdown: shutdown, @@ -82,13 +90,16 @@ func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, // newManager is used by the tests to supply mocks func newManager(pchstore *Store, pchapi managerAPI) (*Manager, error) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + pm := &Manager{ store: pchstore, sa: &stateAccessor{sm: pchapi}, channels: make(map[string]*channelAccessor), pchapi: pchapi, } - pm.ctx, pm.shutdown = context.WithCancel(context.Background()) + pm.ctx, pm.shutdown = context.WithCancel(ctx) return pm, pm.Start() } diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index e6def455112..4c379619be1 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/journal/alerting" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/metrics" @@ -46,6 +47,12 @@ func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { } } +// addNetworkTag adds the network tag to the context for metrics +func addNetworkTag(ctx context.Context) context.Context { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + return ctx +} + func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { var sectorEntries []struct { @@ -120,6 +127,8 @@ func splitString(str string) []string { } func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { + ctx = addNetworkTag(ctx) + var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert { @@ -312,6 +321,8 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri } func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { + ctx = addNetworkTag(ctx) + retryWait := time.Millisecond * 20 retryReportHealth: _, err := dbi.harmonyDB.Exec(ctx, @@ -378,6 +389,7 @@ func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool { } func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { + ctx = addNetworkTag(ctx) if !dbi.checkFileType(ft) { return xerrors.Errorf("invalid filetype") @@ -678,6 +690,8 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface } func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, miner abi.ActorID) ([]storiface.StorageInfo, error) { + ctx = addNetworkTag(ctx) + var err error var spaceReq uint64 switch pathType { diff --git a/storage/wdpost/wdpost_changehandler.go b/storage/wdpost/wdpost_changehandler.go index ce58f148920..176562468ac 100644 --- a/storage/wdpost/wdpost_changehandler.go +++ b/storage/wdpost/wdpost_changehandler.go @@ -8,8 +8,11 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/tag" ) const ( @@ -51,6 +54,7 @@ func (ch *changeHandler) start() { } func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error { + ctx = addNetworkTag(ctx) // Get the current deadline period di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key()) if err != nil { @@ -210,6 +214,7 @@ func (p *proveHandler) run() { } func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { + ctx = addNetworkTag(ctx) // If the post window has expired, abort the current proof if p.current != nil && newTS.Height() >= p.current.di.Close { // Cancel the context on the current proof @@ -387,6 +392,7 @@ func (s *submitHandler) run() { // processHeadChange is called when the chain head changes func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) { + ctx = addNetworkTag(ctx) s.currentCtx = ctx s.currentTS = advance s.currentDI = di @@ -540,3 +546,8 @@ func NextDeadline(currentDeadline *dline.Info) *dline.Info { func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info { return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod(), miner.WPoStChallengeWindow(), miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff) } + +func addNetworkTag(ctx context.Context) context.Context { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + return ctx +} From 65a02f69dee6eb294789b10f2d6faea9f5a05c38 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Wed, 11 Dec 2024 15:37:52 +0530 Subject: [PATCH 09/19] refactor(imports): reorganize and clean up imports in rcmgr.go, manager.go, and wdpost_changehandler.go --- node/modules/lp2p/rcmgr.go | 9 +++++---- paychmgr/manager.go | 5 ++--- storage/wdpost/wdpost_changehandler.go | 3 ++- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 1d4a277b388..a297945ff94 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -9,10 +9,6 @@ import ( "path/filepath" "sync" - "github.com/filecoin-project/lotus/build/buildconstants" - "github.com/filecoin-project/lotus/metrics" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/network" @@ -25,6 +21,11 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/fx" + + "github.com/filecoin-project/lotus/build/buildconstants" + "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" ) var rcmgrMetricsOnce sync.Once diff --git a/paychmgr/manager.go b/paychmgr/manager.go index 482c2c0b7f7..a3ebd539deb 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" + "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -17,13 +18,11 @@ import ( "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" - - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/metrics" - "go.opencensus.io/tag" ) var log = logging.Logger("paych") diff --git a/storage/wdpost/wdpost_changehandler.go b/storage/wdpost/wdpost_changehandler.go index 176562468ac..dc631f45238 100644 --- a/storage/wdpost/wdpost_changehandler.go +++ b/storage/wdpost/wdpost_changehandler.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "go.opencensus.io/tag" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/dline" @@ -12,7 +14,6 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" - "go.opencensus.io/tag" ) const ( From 775cfac1393f2e7134fa33e7b1a4aa5988315dec Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Thu, 12 Dec 2024 11:41:37 +0530 Subject: [PATCH 10/19] feat(metrics): implement AddNetworkTag function for consistent network tagging across multiple modules --- metrics/metrics.go | 6 ++++++ node/modules/lp2p/rcmgr.go | 22 +++++++++++----------- node/modules/paych.go | 3 ++- node/modules/storageminer.go | 4 +++- node/rpc.go | 3 +-- paychmgr/manager.go | 4 ---- storage/paths/db_index.go | 15 ++++----------- storage/wdpost/wdpost_changehandler.go | 13 ++----------- storage/wdpost/wdpost_sched.go | 1 - 9 files changed, 29 insertions(+), 42 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 71047cdbefd..75d4b29491f 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -11,6 +11,7 @@ import ( rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics" "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build/buildconstants" ) // Distribution @@ -851,3 +852,8 @@ func Timer(ctx context.Context, m *stats.Float64Measure) func() time.Duration { return time.Since(start) } } + +func AddNetworkTag(ctx context.Context) context.Context { + ctx, _ = tag.New(ctx, tag.Upsert(Network, buildconstants.NetworkBundle)) + return ctx +} diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index a297945ff94..83d297c23ab 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -196,7 +196,7 @@ func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -212,7 +212,7 @@ func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -223,7 +223,7 @@ func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -234,54 +234,54 @@ func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { func (r rcmgrMetrics) AllowPeer(p peer.ID) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) stats.Record(ctx, metrics.RcmgrAllowPeer.M(1)) } func (r rcmgrMetrics) BlockPeer(p peer.ID) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) stats.Record(ctx, metrics.RcmgrBlockPeer.M(1)) } func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrAllowProto.M(1)) } func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrBlockProto.M(1)) } func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1)) } func (r rcmgrMetrics) AllowService(svc string) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrAllowSvc.M(1)) } func (r rcmgrMetrics) BlockService(svc string) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrBlockSvc.M(1)) } func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1)) } diff --git a/node/modules/paych.go b/node/modules/paych.go index 4f93bbd6c55..f5cce6a5612 100644 --- a/node/modules/paych.go +++ b/node/modules/paych.go @@ -8,6 +8,7 @@ import ( "go.uber.org/fx" "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -17,7 +18,7 @@ import ( func NewManager(mctx helpers.MetricsCtx, lc fx.Lifecycle, sm stmgr.StateManagerAPI, pchstore *paychmgr.Store, api paychmgr.PaychAPI) *paychmgr.Manager { ctx := helpers.LifecycleCtx(mctx, lc) ctx, shutdown := context.WithCancel(ctx) - + ctx = metrics.AddNetworkTag(ctx) return paychmgr.NewManager(ctx, shutdown, sm, pchstore, api) } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index d965d59ebce..cb6f2b778ba 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -11,6 +11,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/jpillora/backoff" + "go.opencensus.io/tag" "go.uber.org/fx" "go.uber.org/multierr" "golang.org/x/xerrors" @@ -33,6 +34,7 @@ import ( "github.com/filecoin-project/lotus/chain/lf3" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal" + "github.com/filecoin-project/lotus/metrics" lotusminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -289,7 +291,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func ) ctx := helpers.LifecycleCtx(mctx, lc) - + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, []dtypes.MinerAddress{params.Maddr}) if err != nil { diff --git a/node/rpc.go b/node/rpc.go index c9af4de3774..bcf78799ff9 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -22,7 +22,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v1api" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" @@ -50,7 +49,7 @@ func ServeRPC(h http.Handler, id string, addr multiaddr.Multiaddr) (StopFunc, er ReadHeaderTimeout: 30 * time.Second, BaseContext: func(listener net.Listener) context.Context { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) ctx, _ = tag.New(ctx, tag.Upsert(metrics.APIInterface, id)) return ctx }, diff --git a/paychmgr/manager.go b/paychmgr/manager.go index a3ebd539deb..83f2de36cf1 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -73,10 +73,6 @@ type Manager struct { func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, pchstore *Store, api PaychAPI) *Manager { impl := &managerAPIImpl{StateManagerAPI: sm, PaychAPI: api} - - // Add network tag to context - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) - return &Manager{ ctx: ctx, shutdown: shutdown, diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 4c379619be1..2136d62a5ae 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -17,7 +17,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/journal/alerting" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/metrics" @@ -47,12 +46,6 @@ func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { } } -// addNetworkTag adds the network tag to the context for metrics -func addNetworkTag(ctx context.Context) context.Context { - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) - return ctx -} - func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { var sectorEntries []struct { @@ -127,7 +120,7 @@ func splitString(str string) []string { } func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { - ctx = addNetworkTag(ctx) + ctx = metrics.AddNetworkTag(ctx) var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) @@ -321,7 +314,7 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri } func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { - ctx = addNetworkTag(ctx) + ctx = metrics.AddNetworkTag(ctx) retryWait := time.Millisecond * 20 retryReportHealth: @@ -389,7 +382,7 @@ func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool { } func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { - ctx = addNetworkTag(ctx) + ctx = metrics.AddNetworkTag(ctx) if !dbi.checkFileType(ft) { return xerrors.Errorf("invalid filetype") @@ -690,7 +683,7 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface } func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, miner abi.ActorID) ([]storiface.StorageInfo, error) { - ctx = addNetworkTag(ctx) + ctx = metrics.AddNetworkTag(ctx) var err error var spaceReq uint64 diff --git a/storage/wdpost/wdpost_changehandler.go b/storage/wdpost/wdpost_changehandler.go index dc631f45238..038a21dce9a 100644 --- a/storage/wdpost/wdpost_changehandler.go +++ b/storage/wdpost/wdpost_changehandler.go @@ -4,13 +4,10 @@ import ( "context" "sync" - "go.opencensus.io/tag" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/dline" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" @@ -55,7 +52,6 @@ func (ch *changeHandler) start() { } func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error { - ctx = addNetworkTag(ctx) // Get the current deadline period di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key()) if err != nil { @@ -215,7 +211,7 @@ func (p *proveHandler) run() { } func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { - ctx = addNetworkTag(ctx) + ctx = metrics.AddNetworkTag(ctx) // If the post window has expired, abort the current proof if p.current != nil && newTS.Height() >= p.current.di.Close { // Cancel the context on the current proof @@ -393,7 +389,7 @@ func (s *submitHandler) run() { // processHeadChange is called when the chain head changes func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) { - ctx = addNetworkTag(ctx) + ctx = metrics.AddNetworkTag(ctx) s.currentCtx = ctx s.currentTS = advance s.currentDI = di @@ -547,8 +543,3 @@ func NextDeadline(currentDeadline *dline.Info) *dline.Info { func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info { return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod(), miner.WPoStChallengeWindow(), miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff) } - -func addNetworkTag(ctx context.Context) context.Context { - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) - return ctx -} diff --git a/storage/wdpost/wdpost_sched.go b/storage/wdpost/wdpost_sched.go index bbf4596fe30..9c9685eb661 100644 --- a/storage/wdpost/wdpost_sched.go +++ b/storage/wdpost/wdpost_sched.go @@ -190,7 +190,6 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { } ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange") - s.update(ctx, nil, chg.Val) span.End() From 1244845faba584825e174b5c5bcccc64cd1e10cf Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Thu, 12 Dec 2024 11:46:44 +0530 Subject: [PATCH 11/19] feat(metrics): update context tagging for network metrics in StorageAttach method --- storage/paths/db_index.go | 1 - storage/paths/local.go | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 2136d62a5ae..0693eef4154 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -120,7 +120,6 @@ func splitString(str string) []string { } func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { - ctx = metrics.AddNetworkTag(ctx) var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) diff --git a/storage/paths/local.go b/storage/paths/local.go index 07223ad5317..8abb376c958 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -12,13 +12,16 @@ import ( "time" "github.com/ipfs/go-cid" + "go.opencensus.io/tag" "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/proof" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/lib/result" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -238,7 +241,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { if err != nil { return err } - + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) err = st.index.StorageAttach(ctx, storiface.StorageInfo{ ID: meta.ID, URLs: st.urls, @@ -331,6 +334,8 @@ func (st *Local) Redeclare(ctx context.Context, filterId *storiface.ID, dropMiss continue } + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + err = st.index.StorageAttach(ctx, storiface.StorageInfo{ ID: id, URLs: st.urls, From 01e8501c7f806a71f441b090ec941e678d149a8c Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Mon, 16 Dec 2024 09:31:41 +0530 Subject: [PATCH 12/19] refactor(metrics): replace buildconstants with AddNetworkTag for consistent network tagging across multiple files --- CHANGELOG.md | 1 - blockstore/splitstore/splitstore.go | 8 ++------ cmd/lotus-miner/run.go | 3 +-- cmd/lotus/daemon.go | 2 +- node/modules/lp2p/rcmgr.go | 3 +-- node/modules/storageminer.go | 3 +-- paychmgr/manager.go | 4 +--- storage/paths/db_index.go | 1 - storage/paths/local.go | 8 ++++---- 9 files changed, 11 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90ad77d4d97..bcc82c8b5a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,7 +39,6 @@ The Lotus v1.31.0 release introduces the new `ChainIndexer` subsystem, enhancing - Return a consistent error when encountering null rounds in ETH RPC method calls. ([filecoin-project/lotus#12655](https://github.com/filecoin-project/lotus/pull/12655)) - Correct erroneous sector QAP-calculation upon sector extension in lotus-miner cli. ([filecoin-project/lotus#12720](https://github.com/filecoin-project/lotus/pull/12720)) - ## 📝 Changelog For the full set of changes since the last stable release: diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index e8418eb97ec..a3b93599f00 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -14,14 +14,12 @@ import ( ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" bstore "github.com/filecoin-project/lotus/blockstore" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" @@ -270,10 +268,8 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss.chainSyncCond.L = &ss.chainSyncMx baseCtx := context.Background() - ctx, err := tag.New(baseCtx, tag.Insert(metrics.Network, buildconstants.NetworkBundle)) - if err != nil { - return nil, xerrors.Errorf("failed to create context with network tag: %w", err) - } + ctx := metrics.AddNetworkTag(baseCtx) + ss.ctx, ss.cancel = context.WithCancel(ctx) ss.reifyCond.L = &ss.reifyMx diff --git a/cmd/lotus-miner/run.go b/cmd/lotus-miner/run.go index ea50981e645..be670287e1a 100644 --- a/cmd/lotus-miner/run.go +++ b/cmd/lotus-miner/run.go @@ -16,7 +16,6 @@ import ( "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/build/buildconstants" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" @@ -60,8 +59,8 @@ var runCmd = &cli.Command{ tag.Insert(metrics.Version, build.MinerBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), tag.Insert(metrics.NodeType, "miner"), - tag.Insert(metrics.Network, buildconstants.NetworkBundle), ) + ctx = metrics.AddNetworkTag(ctx) // Register all metric views if err := view.Register( metrics.MinerNodeViews..., diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 547765fa816..97d2294b79b 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -211,8 +211,8 @@ var DaemonCmd = &cli.Command{ tag.Insert(metrics.Version, build.NodeBuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), tag.Insert(metrics.NodeType, "chain"), - tag.Insert(metrics.Network, buildconstants.NetworkBundle), ) + ctx = metrics.AddNetworkTag(ctx) // Register all metric views if err = view.Register( metrics.ChainNodeViews..., diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 83d297c23ab..337eb21a651 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -22,7 +22,6 @@ import ( "go.opencensus.io/tag" "go.uber.org/fx" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" @@ -180,7 +179,7 @@ type rcmgrMetrics struct{} func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index cb6f2b778ba..7e9482db67f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/jpillora/backoff" - "go.opencensus.io/tag" "go.uber.org/fx" "go.uber.org/multierr" "golang.org/x/xerrors" @@ -291,7 +290,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func ) ctx := helpers.LifecycleCtx(mctx, lc) - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, []dtypes.MinerAddress{params.Maddr}) if err != nil { diff --git a/paychmgr/manager.go b/paychmgr/manager.go index 83f2de36cf1..13876e1d490 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" - "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -18,7 +17,6 @@ import ( "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" @@ -86,7 +84,7 @@ func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, // newManager is used by the tests to supply mocks func newManager(pchstore *Store, pchapi managerAPI) (*Manager, error) { ctx := context.Background() - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) pm := &Manager{ store: pchstore, diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 0693eef4154..c759f35059d 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -120,7 +120,6 @@ func splitString(str string) []string { } func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { - var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert { diff --git a/storage/paths/local.go b/storage/paths/local.go index 8abb376c958..fc5699323f2 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -12,14 +12,12 @@ import ( "time" "github.com/ipfs/go-cid" - "go.opencensus.io/tag" "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/proof" - "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/lib/result" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/storage/sealer/fsutil" @@ -241,7 +239,9 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { if err != nil { return err } - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + + ctx = metrics.AddNetworkTag(ctx) + err = st.index.StorageAttach(ctx, storiface.StorageInfo{ ID: meta.ID, URLs: st.urls, @@ -334,7 +334,7 @@ func (st *Local) Redeclare(ctx context.Context, filterId *storiface.ID, dropMiss continue } - ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx = metrics.AddNetworkTag(ctx) err = st.index.StorageAttach(ctx, storiface.StorageInfo{ ID: id, From fc56efa66ed3b652d6bab6718b07506dd6819b7a Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Tue, 17 Dec 2024 09:34:25 +0530 Subject: [PATCH 13/19] chore(metrics): remove AddNetworkTag calls from multiple files and update CHANGELOG to reflect network name reporting in metrics --- CHANGELOG.md | 2 +- storage/paths/db_index.go | 3 --- storage/paths/local.go | 5 ----- storage/wdpost/wdpost_changehandler.go | 3 --- 4 files changed, 1 insertion(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bcc82c8b5a2..4650f41f067 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ # UNRELEASED - Add json output of tipsets to `louts chain list`. ([filecoin-project/lotus#12691](https://github.com/filecoin-project/lotus/pull/12691)) +- Lotus now reports the network name in the metrics. ([filecoin-project/lotus#12733](https://github.com/filecoin-project/lotus/pull/12733)) # UNRELEASED v.1.32.0 @@ -28,7 +29,6 @@ The Lotus v1.31.0 release introduces the new `ChainIndexer` subsystem, enhancing - Return a "data" field on the "error" returned from RPC when `eth_call` and `eth_estimateGas` APIs encounter `execution reverted` errors. This is a standard expectation of Ethereum RPC tooling and may improve compatibility in some cases. ([filecoin-project/lotus#12553](https://github.com/filecoin-project/lotus/pull/12553)) - Improve ETH-filter performance for nodes serving many clients. ([filecoin-project/lotus#12603](https://github.com/filecoin-project/lotus/pull/12603)) - Implement F3 utility CLIs to list the power table for a given instance and sum the proportional power of a set of actors that participate in a given instance. ([filecoin-project/lotus#12698](https://github.com/filecoin-project/lotus/pull/12698)) -- Lotus now reports the network name in the metrics. ([filecoin-project/lotus#12733](https://github.com/filecoin-project/lotus/pull/12733)) ## Bug Fixes diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index c759f35059d..9be39b40652 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -312,7 +312,6 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri } func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { - ctx = metrics.AddNetworkTag(ctx) retryWait := time.Millisecond * 20 retryReportHealth: @@ -380,7 +379,6 @@ func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool { } func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { - ctx = metrics.AddNetworkTag(ctx) if !dbi.checkFileType(ft) { return xerrors.Errorf("invalid filetype") @@ -681,7 +679,6 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface } func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, miner abi.ActorID) ([]storiface.StorageInfo, error) { - ctx = metrics.AddNetworkTag(ctx) var err error var spaceReq uint64 diff --git a/storage/paths/local.go b/storage/paths/local.go index fc5699323f2..07223ad5317 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -19,7 +19,6 @@ import ( "github.com/filecoin-project/go-state-types/proof" "github.com/filecoin-project/lotus/lib/result" - "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -240,8 +239,6 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { return err } - ctx = metrics.AddNetworkTag(ctx) - err = st.index.StorageAttach(ctx, storiface.StorageInfo{ ID: meta.ID, URLs: st.urls, @@ -334,8 +331,6 @@ func (st *Local) Redeclare(ctx context.Context, filterId *storiface.ID, dropMiss continue } - ctx = metrics.AddNetworkTag(ctx) - err = st.index.StorageAttach(ctx, storiface.StorageInfo{ ID: id, URLs: st.urls, diff --git a/storage/wdpost/wdpost_changehandler.go b/storage/wdpost/wdpost_changehandler.go index 038a21dce9a..ce58f148920 100644 --- a/storage/wdpost/wdpost_changehandler.go +++ b/storage/wdpost/wdpost_changehandler.go @@ -10,7 +10,6 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/metrics" ) const ( @@ -211,7 +210,6 @@ func (p *proveHandler) run() { } func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { - ctx = metrics.AddNetworkTag(ctx) // If the post window has expired, abort the current proof if p.current != nil && newTS.Height() >= p.current.di.Close { // Cancel the context on the current proof @@ -389,7 +387,6 @@ func (s *submitHandler) run() { // processHeadChange is called when the chain head changes func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) { - ctx = metrics.AddNetworkTag(ctx) s.currentCtx = ctx s.currentTS = advance s.currentDI = di From b678818e5b557bbe596659c4778e4aeadd562bdb Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 17 Dec 2024 18:19:02 +1100 Subject: [PATCH 14/19] Update CHANGELOG.md --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4650f41f067..4534ebc303a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,8 +30,6 @@ The Lotus v1.31.0 release introduces the new `ChainIndexer` subsystem, enhancing - Improve ETH-filter performance for nodes serving many clients. ([filecoin-project/lotus#12603](https://github.com/filecoin-project/lotus/pull/12603)) - Implement F3 utility CLIs to list the power table for a given instance and sum the proportional power of a set of actors that participate in a given instance. ([filecoin-project/lotus#12698](https://github.com/filecoin-project/lotus/pull/12698)) -## Bug Fixes - ## 🐛 Bug Fix Highlights - Add logic to check if the miner's owner address is delegated (f4 address). If it is delegated, the `lotus-shed sectors termination-estimate` command now sends the termination state call using the worker ID. This fix resolves the issue where termination-estimate did not function correctly for miners with delegated owner addresses. ([filecoin-project/lotus#12569](https://github.com/filecoin-project/lotus/pull/12569)) - The Lotus Miner will now always mine on the latest chain head returned by lotus, even if that head has less "weight" than the previously seen head. This is necessary because F3 may end up finalizing a tipset with a lower weight, although this situation should be rare on the Filecoin mainnet. ([filecoin-project/lotus#12659](https://github.com/filecoin-project/lotus/pull/12659)) and ([filecoin-project/lotus#12690](https://github.com/filecoin-project/lotus/pull/12690)) From 729d0e975e518b54fbb3d2c4d09f11e95748cdbc Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 17 Dec 2024 18:19:16 +1100 Subject: [PATCH 15/19] Apply suggestions from code review --- storage/paths/db_index.go | 1 - storage/wdpost/wdpost_sched.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 9be39b40652..5861e81765d 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -679,7 +679,6 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface } func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, miner abi.ActorID) ([]storiface.StorageInfo, error) { - var err error var spaceReq uint64 switch pathType { diff --git a/storage/wdpost/wdpost_sched.go b/storage/wdpost/wdpost_sched.go index 9c9685eb661..bbf4596fe30 100644 --- a/storage/wdpost/wdpost_sched.go +++ b/storage/wdpost/wdpost_sched.go @@ -190,6 +190,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { } ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange") + s.update(ctx, nil, chg.Val) span.End() From 58bbd4f0cca6598baba8551133a565831b63527f Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 17 Dec 2024 18:19:31 +1100 Subject: [PATCH 16/19] Update storage/paths/db_index.go --- storage/paths/db_index.go | 1 - 1 file changed, 1 deletion(-) diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 5861e81765d..e6def455112 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -312,7 +312,6 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri } func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { - retryWait := time.Millisecond * 20 retryReportHealth: _, err := dbi.harmonyDB.Exec(ctx, From 1e2fe5f1df3df750cc5ad4a4aa14d32df60f1249 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 17 Dec 2024 18:21:32 +1100 Subject: [PATCH 17/19] Update CHANGELOG.md --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4534ebc303a..0ba3d736d90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ # UNRELEASED - Add json output of tipsets to `louts chain list`. ([filecoin-project/lotus#12691](https://github.com/filecoin-project/lotus/pull/12691)) +- Remove IPNI advertisement relay over pubsub via Lotus node as it now has been deprecated. ([filecoin-project/lotus#12768](https://github.com/filecoin-project/lotus/pull/12768) +- During a network upgrade, log migration progress every 2 seconds so they are more helpful and informative. The `LOTUS_MIGRATE_PROGRESS_LOG_SECONDS` environment variable can be used to change this if needed. ([filecoin-project/lotus#12732](https://github.com/filecoin-project/lotus/pull/12732)) +- Add F3GetCertificate & F3GetLatestCertificate to the gateway. ([filecoin-project/lotus#12778](https://github.com/filecoin-project/lotus/pull/12778)) - Lotus now reports the network name in the metrics. ([filecoin-project/lotus#12733](https://github.com/filecoin-project/lotus/pull/12733)) # UNRELEASED v.1.32.0 From 5249bf53b85f8bc92ea9112b5c9322a908da6987 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 17 Dec 2024 18:55:50 +1100 Subject: [PATCH 18/19] chore: plumb contexts wherever possible --- chain/exchange/server.go | 5 ++++- chain/gen/gen.go | 2 +- chain/gen/genesis/genesis.go | 2 +- chain/index/ddls_test.go | 27 ++++++++++++++------------- chain/index/helpers.go | 2 +- chain/index/indexer.go | 4 ++-- chain/index/read_test.go | 2 +- chain/messagepool/check.go | 2 +- chain/messagepool/messagepool.go | 4 ++-- chain/stmgr/stmgr.go | 5 ++--- chain/store/index_test.go | 2 +- chain/store/store.go | 4 ++-- chain/store/store_test.go | 8 ++++---- chain/sync.go | 6 +++++- cmd/lotus-bench/import.go | 2 +- cmd/lotus-shed/balances.go | 4 ++-- cmd/lotus-shed/deal-label.go | 2 +- cmd/lotus-shed/export.go | 2 +- cmd/lotus-shed/gas-estimation.go | 4 ++-- cmd/lotus-shed/genesis-verify.go | 2 +- cmd/lotus-shed/invariants.go | 2 +- cmd/lotus-shed/migrations.go | 2 +- cmd/lotus-shed/miner-peerid.go | 2 +- cmd/lotus-shed/miner-types.go | 2 +- cmd/lotus-shed/msig.go | 2 +- cmd/lotus-shed/pruning.go | 2 +- cmd/lotus-shed/state-stats.go | 2 +- cmd/lotus-shed/terminations.go | 2 +- cmd/lotus-sim/simulation/node.go | 2 +- cmd/lotus/daemon.go | 2 +- conformance/driver.go | 2 +- node/modules/chain.go | 2 +- node/modules/chainindex.go | 2 +- 33 files changed, 62 insertions(+), 55 deletions(-) diff --git a/chain/exchange/server.go b/chain/exchange/server.go index e8b2a414e69..09e8d3607ce 100644 --- a/chain/exchange/server.go +++ b/chain/exchange/server.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" ) // server implements exchange.Server. It services requests for the @@ -35,7 +36,9 @@ func NewServer(cs *store.ChainStore) Server { // HandleStream implements Server.HandleStream. Refer to the godocs there. func (s *server) HandleStream(stream inet.Stream) { - ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream") + ctx := context.Background() + ctx = metrics.AddNetworkTag(ctx) + ctx, span := trace.StartSpan(ctx, "chainxchg.HandleStream") defer span.End() defer stream.Close() //nolint:errcheck diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 6747ff4098a..64ba2be2e80 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -239,7 +239,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS return nil, xerrors.Errorf("make genesis block failed: %w", err) } - cs := store.NewChainStore(bs, bs, ds, filcns.Weight, j) + cs := store.NewChainStore(context.TODO(), bs, bs, ds, filcns.Weight, j) genfb := &types.FullBlock{Header: genb.Genesis} gents := store.NewFullTipSet([]*types.FullBlock{genfb}) diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 0d41b2c9f89..9bf51f699b6 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -574,7 +574,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto } // temp chainstore - cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, j) + cs := store.NewChainStore(ctx, bs, bs, datastore.NewMapDatastore(), nil, j) // Verify PreSealed Data stateroot, err = VerifyPreSealedData(ctx, cs, sys, stateroot, template, keyIDs, template.NetworkVersion) diff --git a/chain/index/ddls_test.go b/chain/index/ddls_test.go index e71db1a8cdf..b5d91874f53 100644 --- a/chain/index/ddls_test.go +++ b/chain/index/ddls_test.go @@ -1,6 +1,7 @@ package index import ( + "context" "database/sql" "testing" @@ -16,7 +17,7 @@ const ( ) func TestHasRevertedEventsInTipsetStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // running on empty DB should return false @@ -71,7 +72,7 @@ func TestHasRevertedEventsInTipsetStmt(t *testing.T) { } func TestGetNonRevertedTipsetCountStmts(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // running on empty DB should return 0 @@ -145,7 +146,7 @@ func TestGetNonRevertedTipsetCountStmts(t *testing.T) { } func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // Insert a tipset message @@ -268,7 +269,7 @@ func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) { require.Equal(t, []byte("value3"), entries[0].value) } func TestUpdateTipsetToNonRevertedStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // insert a reverted tipset @@ -296,7 +297,7 @@ func TestUpdateTipsetToNonRevertedStmt(t *testing.T) { } func TestHasNullRoundAtHeightStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // running on empty DB should return true @@ -317,7 +318,7 @@ func TestHasNullRoundAtHeightStmt(t *testing.T) { } func TestHasTipsetStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // running on empty DB should return false @@ -340,7 +341,7 @@ func TestHasTipsetStmt(t *testing.T) { } func TestUpdateEventsToRevertedStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // Insert a non-reverted tipset @@ -390,7 +391,7 @@ func TestUpdateEventsToRevertedStmt(t *testing.T) { } func TestCountTipsetsAtHeightStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // Test empty DB @@ -438,7 +439,7 @@ func TestCountTipsetsAtHeightStmt(t *testing.T) { } func TestNonRevertedTipsetAtHeightStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // Test empty DB @@ -498,7 +499,7 @@ func TestNonRevertedTipsetAtHeightStmt(t *testing.T) { } func TestMinNonRevertedHeightStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // Test empty DB @@ -557,7 +558,7 @@ func verifyMinNonRevertedHeightStmt(t *testing.T, s *SqliteIndexer, expectedMinH } func TestGetMsgIdForMsgCidAndTipsetStmt(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // Insert a non-reverted tipset @@ -603,7 +604,7 @@ func TestGetMsgIdForMsgCidAndTipsetStmt(t *testing.T) { } func TestForeignKeyCascadeDelete(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) // Insert a tipset @@ -647,7 +648,7 @@ func TestForeignKeyCascadeDelete(t *testing.T) { } func TestInsertTipsetMessage(t *testing.T) { - s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0) require.NoError(t, err) ts := tipsetMessage{ diff --git a/chain/index/helpers.go b/chain/index/helpers.go index a4db495c99e..601bb9b0565 100644 --- a/chain/index/helpers.go +++ b/chain/index/helpers.go @@ -41,7 +41,7 @@ func PopulateFromSnapshot(ctx context.Context, path string, cs ChainStore) error } } - si, err := NewSqliteIndexer(path, cs, 0, false, 0) + si, err := NewSqliteIndexer(ctx, path, cs, 0, false, 0) if err != nil { return xerrors.Errorf("failed to create sqlite indexer: %w", err) } diff --git a/chain/index/indexer.go b/chain/index/indexer.go index 7cee575a6df..8643ce76571 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -89,7 +89,7 @@ type SqliteIndexer struct { writerLk sync.Mutex } -func NewSqliteIndexer(path string, cs ChainStore, gcRetentionEpochs int64, reconcileEmptyIndex bool, +func NewSqliteIndexer(ctx context.Context, path string, cs ChainStore, gcRetentionEpochs int64, reconcileEmptyIndex bool, maxReconcileTipsets uint64) (si *SqliteIndexer, err error) { if gcRetentionEpochs != 0 && gcRetentionEpochs < builtin.EpochsInDay { @@ -101,7 +101,7 @@ func NewSqliteIndexer(path string, cs ChainStore, gcRetentionEpochs int64, recon return nil, xerrors.Errorf("failed to setup message index db: %w", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer func() { if err != nil { diff --git a/chain/index/read_test.go b/chain/index/read_test.go index 4f8b4699c07..7771315c01b 100644 --- a/chain/index/read_test.go +++ b/chain/index/read_test.go @@ -92,7 +92,7 @@ func setupWithHeadIndexed(t *testing.T, headHeight abi.ChainEpoch, rng *pseudo.R d := newDummyChainStore() d.SetHeaviestTipSet(head) - s, err := NewSqliteIndexer(":memory:", d, 0, false, 0) + s, err := NewSqliteIndexer(context.Background(), ":memory:", d, 0, false, 0) require.NoError(t, err) insertHead(t, s, head, headHeight) diff --git a/chain/messagepool/check.go b/chain/messagepool/check.go index fdec910c4ea..cf85a5f5470 100644 --- a/chain/messagepool/check.go +++ b/chain/messagepool/check.go @@ -122,7 +122,7 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, if len(curTs.Blocks()) > 0 { baseFee = curTs.Blocks()[0].ParentBaseFee } else { - baseFee, err = mp.api.ChainComputeBaseFee(context.Background(), curTs) + baseFee, err = mp.api.ChainComputeBaseFee(ctx, curTs) if err != nil { return nil, xerrors.Errorf("error computing basefee: %w", err) } diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 80dd1584987..b2d523e1ecd 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -419,7 +419,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra // enable initial prunes mp.pruneCooldown <- struct{}{} - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(ctx) // load the current tipset and subscribe to head changes _before_ loading local messages mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { @@ -664,7 +664,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(ctx context.Context, m *types.SignedMe baseFee = curTs.Blocks()[0].ParentBaseFee } else { var err error - baseFee, err = mp.api.ChainComputeBaseFee(context.TODO(), curTs) + baseFee, err = mp.api.ChainComputeBaseFee(ctx, curTs) if err != nil { return false, xerrors.Errorf("computing basefee: %w", err) } diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 5b227fe922e..4307af9ced7 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -269,9 +269,8 @@ func cidsToKey(cids []cid.Cid) string { // pre-migration functions to run ahead of network upgrades. // // This method is not safe to invoke from multiple threads or concurrently with Stop. -func (sm *StateManager) Start(context.Context) error { - var ctx context.Context - ctx, sm.cancel = context.WithCancel(context.Background()) +func (sm *StateManager) Start(ctx context.Context) error { + ctx, sm.cancel = context.WithCancel(ctx) sm.shutdown = make(chan struct{}) go sm.preMigrationWorker(ctx) return nil diff --git a/chain/store/index_test.go b/chain/store/index_test.go index 90146d68b0e..f47779e949b 100644 --- a/chain/store/index_test.go +++ b/chain/store/index_test.go @@ -35,7 +35,7 @@ func TestIndexSeeks(t *testing.T) { ctx := context.TODO() nbs := blockstore.NewMemorySync() - cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), filcns.Weight, nil) + cs := store.NewChainStore(context.Background(), nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), filcns.Weight, nil) defer cs.Close() //nolint:errcheck _, _, err = cs.Import(ctx, bytes.NewReader(gencar)) diff --git a/chain/store/store.go b/chain/store/store.go index 02c5c779ef3..85c7fd92ff5 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -133,7 +133,7 @@ type ChainStore struct { wg sync.WaitGroup } -func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, weight WeightFunc, j journal.Journal) *ChainStore { +func NewChainStore(ctx context.Context, chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, weight WeightFunc, j journal.Journal) *ChainStore { c, _ := arc.NewARC[cid.Cid, mmCids](DefaultMsgMetaCacheSize) tsc, _ := arc.NewARC[types.TipSetKey, *types.TipSet](DefaultTipSetCacheSize) if j == nil { @@ -191,7 +191,7 @@ func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dsto hcmetric := func(rev, app []*types.TipSet) error { for _, r := range app { - stats.Record(context.Background(), metrics.ChainNodeHeight.M(int64(r.Height()))) + stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height()))) } return nil } diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 5329371ae33..5b6e2886cb6 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -74,7 +74,7 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(context.Background(), bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck b.ResetTimer() @@ -109,7 +109,7 @@ func TestChainExportImport(t *testing.T) { } nbs := blockstore.NewMemorySync() - cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil) + cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil) defer cs.Close() //nolint:errcheck root, _, err := cs.Import(context.TODO(), buf) @@ -144,7 +144,7 @@ func TestChainImportTipsetKeyCid(t *testing.T) { } nbs := blockstore.NewMemorySync() - cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil) + cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil) defer cs.Close() //nolint:errcheck root, _, err := cs.Import(ctx, buf) @@ -190,7 +190,7 @@ func TestChainExportImportFull(t *testing.T) { nbs := blockstore.NewMemorySync() ds := datastore.NewMapDatastore() - cs := store.NewChainStore(nbs, nbs, ds, filcns.Weight, nil) + cs := store.NewChainStore(context.Background(), nbs, nbs, ds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck root, _, err := cs.Import(context.TODO(), buf) diff --git a/chain/sync.go b/chain/sync.go index 1ee95af0e29..012fe9163ec 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -163,7 +163,9 @@ func NewSyncer(ds dtypes.MetadataDS, } func (syncer *Syncer) Start() { - tickerCtx, tickerCtxCancel := context.WithCancel(context.Background()) + ctx := context.Background() + ctx = metrics.AddNetworkTag(ctx) + tickerCtx, tickerCtxCancel := context.WithCancel(ctx) syncer.syncmgr.Start() syncer.tickerCtxCancel = tickerCtxCancel @@ -205,6 +207,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { }() ctx := context.Background() + ctx = metrics.AddNetworkTag(ctx) if fts == nil { log.Errorf("got nil tipset in InformNewHead") return false @@ -302,6 +305,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { blockstore := bstore.NewMemory() cst := cbor.NewCborStore(blockstore) ctx := context.Background() + ctx = metrics.AddNetworkTag(ctx) var bcids, scids []cid.Cid for _, m := range fblk.BlsMessages { diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index 46f2411bf86..e8504656d89 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -224,7 +224,7 @@ var importBenchCmd = &cli.Command{ } metadataDs := datastore.NewMapDatastore() - cs := store.NewChainStore(bs, bs, metadataDs, filcns.Weight, nil) + cs := store.NewChainStore(cctx.Context, bs, bs, metadataDs, filcns.Weight, nil) defer cs.Close() //nolint:errcheck // TODO: We need to supply the actual beacon after v14 diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 48030c58ccd..59afb108598 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -515,7 +515,7 @@ var chainBalanceStateCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) @@ -739,7 +739,7 @@ var chainPledgeCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) diff --git a/cmd/lotus-shed/deal-label.go b/cmd/lotus-shed/deal-label.go index 417d1370193..9b73b684cd1 100644 --- a/cmd/lotus-shed/deal-label.go +++ b/cmd/lotus-shed/deal-label.go @@ -70,7 +70,7 @@ var dealLabelCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index e6d0c4e056f..ff4bb5198bc 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -113,7 +113,7 @@ var exportChainCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, nil, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, nil, nil) defer cs.Close() //nolint:errcheck if err := cs.Load(context.Background()); err != nil { diff --git a/cmd/lotus-shed/gas-estimation.go b/cmd/lotus-shed/gas-estimation.go index b1c61b62f2c..8e9c8c11d7c 100644 --- a/cmd/lotus-shed/gas-estimation.go +++ b/cmd/lotus-shed/gas-estimation.go @@ -103,7 +103,7 @@ var gasTraceCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(proofsffi.ProofVerifier), filcns.DefaultUpgradeSchedule(), @@ -200,7 +200,7 @@ var replayOfflineCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(proofsffi.ProofVerifier), filcns.DefaultUpgradeSchedule(), diff --git a/cmd/lotus-shed/genesis-verify.go b/cmd/lotus-shed/genesis-verify.go index 4db5eb1436b..ef24c60e77a 100644 --- a/cmd/lotus-shed/genesis-verify.go +++ b/cmd/lotus-shed/genesis-verify.go @@ -53,7 +53,7 @@ var genesisVerifyCmd = &cli.Command{ } bs := blockstore.NewMemory() - cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), filcns.Weight, nil) + cs := store.NewChainStore(cctx.Context, bs, bs, datastore.NewMapDatastore(), filcns.Weight, nil) defer cs.Close() //nolint:errcheck cf := cctx.Args().Get(0) diff --git a/cmd/lotus-shed/invariants.go b/cmd/lotus-shed/invariants.go index bc994bbe633..1b2105ebb2a 100644 --- a/cmd/lotus-shed/invariants.go +++ b/cmd/lotus-shed/invariants.go @@ -125,7 +125,7 @@ var invariantsCmd = &cli.Command{ }() bs := ss - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(proofsffi.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, nil) diff --git a/cmd/lotus-shed/migrations.go b/cmd/lotus-shed/migrations.go index be0ef834c4d..a845bba8cbd 100644 --- a/cmd/lotus-shed/migrations.go +++ b/cmd/lotus-shed/migrations.go @@ -174,7 +174,7 @@ var migrationsCmd = &cli.Command{ }() bs := ss - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck // Note: we use a map datastore for the metadata to avoid writing / using cached migration results in the metadata store diff --git a/cmd/lotus-shed/miner-peerid.go b/cmd/lotus-shed/miner-peerid.go index e430637976c..c96ed567fc3 100644 --- a/cmd/lotus-shed/miner-peerid.go +++ b/cmd/lotus-shed/miner-peerid.go @@ -79,7 +79,7 @@ var minerPeeridCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) diff --git a/cmd/lotus-shed/miner-types.go b/cmd/lotus-shed/miner-types.go index 822d037aa26..251bfa5bf7e 100644 --- a/cmd/lotus-shed/miner-types.go +++ b/cmd/lotus-shed/miner-types.go @@ -76,7 +76,7 @@ var minerTypesCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) diff --git a/cmd/lotus-shed/msig.go b/cmd/lotus-shed/msig.go index ccc932c93ff..6bffdf61d40 100644 --- a/cmd/lotus-shed/msig.go +++ b/cmd/lotus-shed/msig.go @@ -88,7 +88,7 @@ var multisigGetAllCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index c0bd453b145..be711c72600 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -166,7 +166,7 @@ var stateTreePruneCmd = &cli.Command{ return nil } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck if err := cs.Load(context.Background()); err != nil { diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 88b21f40767..6210b67ec05 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -284,7 +284,7 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error) } } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) if err := cs.Load(ctx); err != nil { return nil, xerrors.Errorf("failed to load chain store: %w", err) } diff --git a/cmd/lotus-shed/terminations.go b/cmd/lotus-shed/terminations.go index 563c1ba3a77..2849a340e23 100644 --- a/cmd/lotus-shed/terminations.go +++ b/cmd/lotus-shed/terminations.go @@ -79,7 +79,7 @@ var terminationsCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil) + cs := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, nil) defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) diff --git a/cmd/lotus-sim/simulation/node.go b/cmd/lotus-sim/simulation/node.go index cda3e69d839..7ab042830ce 100644 --- a/cmd/lotus-sim/simulation/node.go +++ b/cmd/lotus-sim/simulation/node.go @@ -62,7 +62,7 @@ func NewNode(ctx context.Context, r repo.Repo) (nd *Node, _err error) { } return &Node{ repo: lr, - Chainstore: store.NewChainStore(bs, bs, ds, filcns.Weight, nil), + Chainstore: store.NewChainStore(ctx, bs, bs, ds, filcns.Weight, nil), MetadataDS: ds, Blockstore: bs, }, err diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 97d2294b79b..e1b2103ae1e 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -561,7 +561,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) return xerrors.Errorf("failed to open journal: %w", err) } - cst := store.NewChainStore(bs, bs, mds, filcns.Weight, j) + cst := store.NewChainStore(ctx, bs, bs, mds, filcns.Weight, j) defer cst.Close() //nolint:errcheck log.Infof("importing chain from %s...", fname) diff --git a/conformance/driver.go b/conformance/driver.go index 15ae567063a..94e2d1ce3ef 100644 --- a/conformance/driver.go +++ b/conformance/driver.go @@ -107,7 +107,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params tipset = params.Tipset syscalls = vm.Syscalls(proofsffi.ProofVerifier) - cs = store.NewChainStore(bs, bs, ds, filcns.Weight, nil) + cs = store.NewChainStore(context.TODO(), bs, bs, ds, filcns.Weight, nil) tse = consensus.NewTipSetExecutor(filcns.RewardFunc) sm, err = stmgr.NewStateManager(cs, tse, syscalls, filcns.DefaultUpgradeSchedule(), nil, ds, nil) ) diff --git a/node/modules/chain.go b/node/modules/chain.go index cf088283ea5..c80a7cd091f 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -83,7 +83,7 @@ func ChainStore(lc fx.Lifecycle, us stmgr.UpgradeSchedule, j journal.Journal) (*store.ChainStore, error) { - chain := store.NewChainStore(cbs, sbs, ds, weight, j) + chain := store.NewChainStore(mctx, cbs, sbs, ds, weight, j) if err := chain.Load(helpers.LifecycleCtx(mctx, lc)); err != nil { return nil, xerrors.Errorf("loading chain state from disk: %w", err) diff --git a/node/modules/chainindex.go b/node/modules/chainindex.go index d2307a77600..abfacd66331 100644 --- a/node/modules/chainindex.go +++ b/node/modules/chainindex.go @@ -36,7 +36,7 @@ func ChainIndexer(cfg config.ChainIndexerConfig) func(lc fx.Lifecycle, mctx help } dbPath := filepath.Join(chainIndexPath, index.DefaultDbFilename) - chainIndexer, err := index.NewSqliteIndexer(dbPath, cs, cfg.GCRetentionEpochs, cfg.ReconcileEmptyIndex, cfg.MaxReconcileTipsets) + chainIndexer, err := index.NewSqliteIndexer(mctx, dbPath, cs, cfg.GCRetentionEpochs, cfg.ReconcileEmptyIndex, cfg.MaxReconcileTipsets) if err != nil { return nil, err } From 57ab837de8a268a163a627449a81175be86bf4c2 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 17 Dec 2024 19:14:31 +1100 Subject: [PATCH 19/19] fixup! chore: plumb contexts wherever possible --- chain/sync_manager.go | 4 ++-- chain/sync_manager_test.go | 2 +- node/builder_chain.go | 7 ++++++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/chain/sync_manager.go b/chain/sync_manager.go index b2bf3bba8a6..5b1d2b23617 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -112,8 +112,8 @@ type workerStatus struct { // sync manager interface -func NewSyncManager(sync SyncFunc) SyncManager { - ctx, cancel := context.WithCancel(context.Background()) +func NewSyncManager(ctx context.Context, sync SyncFunc) SyncManager { + ctx, cancel := context.WithCancel(ctx) return &syncManager{ ctx: ctx, cancel: cancel, diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 02b35fe009f..771673dc3ad 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -25,7 +25,7 @@ type syncOp struct { func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *syncManager, chan *syncOp)) { syncTargets := make(chan *syncOp) - sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error { + sm := NewSyncManager(context.Background(), func(ctx context.Context, ts *types.TipSet) error { ch := make(chan struct{}) syncTargets <- &syncOp{ ts: ts, diff --git a/node/builder_chain.go b/node/builder_chain.go index 1293dcd0c76..a14a245510b 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -44,6 +44,7 @@ import ( "github.com/filecoin-project/lotus/node/impl/net" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/lp2p" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/paychmgr" @@ -93,7 +94,11 @@ var ChainNode = Options( // We don't want the SyncManagerCtor to be used as an fx constructor, but rather as a value. // It will be called implicitly by the Syncer constructor. - Override(new(chain.SyncManagerCtor), func() chain.SyncManagerCtor { return chain.NewSyncManager }), + Override(new(chain.SyncManagerCtor), func(mctx helpers.MetricsCtx) chain.SyncManagerCtor { + return func(syncFn chain.SyncFunc) chain.SyncManager { + return chain.NewSyncManager(mctx, syncFn) + } + }), Override(new(*chain.Syncer), modules.NewSyncer), Override(new(exchange.Client), exchange.NewClient),