Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb context where it's missing, and where possible #12793

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d3b150c
add netwok tag to metrics
virajbhartiya Nov 28, 2024
747f087
Enhance metrics by adding network tag to daemon and miner commands
virajbhartiya Nov 29, 2024
e420b87
Update CHANGELOG
virajbhartiya Nov 29, 2024
83a2bca
minor fix
virajbhartiya Nov 29, 2024
a80b86b
Update cmd/lotus-miner/run.go
virajbhartiya Dec 4, 2024
1ab14ea
Update cmd/lotus/daemon.go
virajbhartiya Dec 4, 2024
3080368
chore: remove unused import
virajbhartiya Dec 10, 2024
78e0bf5
Merge branch 'master' into network-tag
virajbhartiya Dec 10, 2024
be5d79a
feat(metrics): add network tagging to context across multiple files
virajbhartiya Dec 11, 2024
65a02f6
refactor(imports): reorganize and clean up imports in rcmgr.go, manag…
virajbhartiya Dec 11, 2024
775cfac
feat(metrics): implement AddNetworkTag function for consistent networ…
virajbhartiya Dec 12, 2024
1244845
feat(metrics): update context tagging for network metrics in StorageA…
virajbhartiya Dec 12, 2024
01e8501
refactor(metrics): replace buildconstants with AddNetworkTag for cons…
virajbhartiya Dec 16, 2024
fc56efa
chore(metrics): remove AddNetworkTag calls from multiple files and up…
virajbhartiya Dec 17, 2024
b678818
Update CHANGELOG.md
rvagg Dec 17, 2024
729d0e9
Apply suggestions from code review
rvagg Dec 17, 2024
58bbd4f
Update storage/paths/db_index.go
rvagg Dec 17, 2024
410ec91
Merge branch 'master' into network-tag
rvagg Dec 17, 2024
1e2fe5f
Update CHANGELOG.md
rvagg Dec 17, 2024
5249bf5
chore: plumb contexts wherever possible
rvagg Dec 17, 2024
57ab837
fixup! chore: plumb contexts wherever possible
rvagg Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Remove IPNI advertisement relay over pubsub via Lotus node as it now has been deprecated. ([filecoin-project/lotus#12768](https://github.com/filecoin-project/lotus/pull/12768)
- During a network upgrade, log migration progress every 2 seconds so they are more helpful and informative. The `LOTUS_MIGRATE_PROGRESS_LOG_SECONDS` environment variable can be used to change this if needed. ([filecoin-project/lotus#12732](https://github.com/filecoin-project/lotus/pull/12732))
- Add F3GetCertificate & F3GetLatestCertificate to the gateway. ([filecoin-project/lotus#12778](https://github.com/filecoin-project/lotus/pull/12778))
- Lotus now reports the network name in the metrics. ([filecoin-project/lotus#12733](https://github.com/filecoin-project/lotus/pull/12733))

# UNRELEASED v.1.32.0

Expand Down
6 changes: 5 additions & 1 deletion blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ss.txnViewsCond.L = &ss.txnViewsMx
ss.txnSyncCond.L = &ss.txnSyncMx
ss.chainSyncCond.L = &ss.chainSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

baseCtx := context.Background()
ctx := metrics.AddNetworkTag(baseCtx)

ss.ctx, ss.cancel = context.WithCancel(ctx)

ss.reifyCond.L = &ss.reifyMx
ss.reifyPend = make(map[cid.Cid]struct{})
Expand Down
5 changes: 4 additions & 1 deletion chain/exchange/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
)

// server implements exchange.Server. It services requests for the
Expand All @@ -35,7 +36,9 @@ func NewServer(cs *store.ChainStore) Server {

// HandleStream implements Server.HandleStream. Refer to the godocs there.
func (s *server) HandleStream(stream inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream")
ctx := context.Background()
ctx = metrics.AddNetworkTag(ctx)
ctx, span := trace.StartSpan(ctx, "chainxchg.HandleStream")
defer span.End()

defer stream.Close() //nolint:errcheck
Expand Down
2 changes: 1 addition & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
return nil, xerrors.Errorf("make genesis block failed: %w", err)
}

cs := store.NewChainStore(bs, bs, ds, filcns.Weight, j)
cs := store.NewChainStore(context.TODO(), bs, bs, ds, filcns.Weight, j)

genfb := &types.FullBlock{Header: genb.Genesis}
gents := store.NewFullTipSet([]*types.FullBlock{genfb})
Expand Down
2 changes: 1 addition & 1 deletion chain/gen/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto
}

// temp chainstore
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, j)
cs := store.NewChainStore(ctx, bs, bs, datastore.NewMapDatastore(), nil, j)

// Verify PreSealed Data
stateroot, err = VerifyPreSealedData(ctx, cs, sys, stateroot, template, keyIDs, template.NetworkVersion)
Expand Down
27 changes: 14 additions & 13 deletions chain/index/ddls_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package index

import (
"context"
"database/sql"
"testing"

Expand All @@ -16,7 +17,7 @@ const (
)

func TestHasRevertedEventsInTipsetStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// running on empty DB should return false
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestHasRevertedEventsInTipsetStmt(t *testing.T) {
}

func TestGetNonRevertedTipsetCountStmts(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// running on empty DB should return 0
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestGetNonRevertedTipsetCountStmts(t *testing.T) {
}

func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Insert a tipset message
Expand Down Expand Up @@ -268,7 +269,7 @@ func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) {
require.Equal(t, []byte("value3"), entries[0].value)
}
func TestUpdateTipsetToNonRevertedStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// insert a reverted tipset
Expand Down Expand Up @@ -296,7 +297,7 @@ func TestUpdateTipsetToNonRevertedStmt(t *testing.T) {
}

func TestHasNullRoundAtHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// running on empty DB should return true
Expand All @@ -317,7 +318,7 @@ func TestHasNullRoundAtHeightStmt(t *testing.T) {
}

func TestHasTipsetStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// running on empty DB should return false
Expand All @@ -340,7 +341,7 @@ func TestHasTipsetStmt(t *testing.T) {
}

func TestUpdateEventsToRevertedStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Insert a non-reverted tipset
Expand Down Expand Up @@ -390,7 +391,7 @@ func TestUpdateEventsToRevertedStmt(t *testing.T) {
}

func TestCountTipsetsAtHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Test empty DB
Expand Down Expand Up @@ -438,7 +439,7 @@ func TestCountTipsetsAtHeightStmt(t *testing.T) {
}

func TestNonRevertedTipsetAtHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Test empty DB
Expand Down Expand Up @@ -498,7 +499,7 @@ func TestNonRevertedTipsetAtHeightStmt(t *testing.T) {
}

func TestMinNonRevertedHeightStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Test empty DB
Expand Down Expand Up @@ -557,7 +558,7 @@ func verifyMinNonRevertedHeightStmt(t *testing.T, s *SqliteIndexer, expectedMinH
}

func TestGetMsgIdForMsgCidAndTipsetStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Insert a non-reverted tipset
Expand Down Expand Up @@ -603,7 +604,7 @@ func TestGetMsgIdForMsgCidAndTipsetStmt(t *testing.T) {
}

func TestForeignKeyCascadeDelete(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Insert a tipset
Expand Down Expand Up @@ -647,7 +648,7 @@ func TestForeignKeyCascadeDelete(t *testing.T) {
}

func TestInsertTipsetMessage(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", nil, 0, false, 0)
require.NoError(t, err)

ts := tipsetMessage{
Expand Down
2 changes: 1 addition & 1 deletion chain/index/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func PopulateFromSnapshot(ctx context.Context, path string, cs ChainStore) error
}
}

si, err := NewSqliteIndexer(path, cs, 0, false, 0)
si, err := NewSqliteIndexer(ctx, path, cs, 0, false, 0)
if err != nil {
return xerrors.Errorf("failed to create sqlite indexer: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type SqliteIndexer struct {
writerLk sync.Mutex
}

func NewSqliteIndexer(path string, cs ChainStore, gcRetentionEpochs int64, reconcileEmptyIndex bool,
func NewSqliteIndexer(ctx context.Context, path string, cs ChainStore, gcRetentionEpochs int64, reconcileEmptyIndex bool,
maxReconcileTipsets uint64) (si *SqliteIndexer, err error) {

if gcRetentionEpochs != 0 && gcRetentionEpochs < builtin.EpochsInDay {
Expand All @@ -101,7 +101,7 @@ func NewSqliteIndexer(path string, cs ChainStore, gcRetentionEpochs int64, recon
return nil, xerrors.Errorf("failed to setup message index db: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)

defer func() {
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion chain/index/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func setupWithHeadIndexed(t *testing.T, headHeight abi.ChainEpoch, rng *pseudo.R
d := newDummyChainStore()
d.SetHeaviestTipSet(head)

s, err := NewSqliteIndexer(":memory:", d, 0, false, 0)
s, err := NewSqliteIndexer(context.Background(), ":memory:", d, 0, false, 0)
require.NoError(t, err)
insertHead(t, s, head, headHeight)

Expand Down
2 changes: 1 addition & 1 deletion chain/messagepool/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
if len(curTs.Blocks()) > 0 {
baseFee = curTs.Blocks()[0].ParentBaseFee
} else {
baseFee, err = mp.api.ChainComputeBaseFee(context.Background(), curTs)
baseFee, err = mp.api.ChainComputeBaseFee(ctx, curTs)
if err != nil {
return nil, xerrors.Errorf("error computing basefee: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra
// enable initial prunes
mp.pruneCooldown <- struct{}{}

ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(ctx)

// load the current tipset and subscribe to head changes _before_ loading local messages
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
Expand Down Expand Up @@ -664,7 +664,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(ctx context.Context, m *types.SignedMe
baseFee = curTs.Blocks()[0].ParentBaseFee
} else {
var err error
baseFee, err = mp.api.ChainComputeBaseFee(context.TODO(), curTs)
baseFee, err = mp.api.ChainComputeBaseFee(ctx, curTs)
if err != nil {
return false, xerrors.Errorf("computing basefee: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,8 @@ func cidsToKey(cids []cid.Cid) string {
// pre-migration functions to run ahead of network upgrades.
//
// This method is not safe to invoke from multiple threads or concurrently with Stop.
func (sm *StateManager) Start(context.Context) error {
var ctx context.Context
ctx, sm.cancel = context.WithCancel(context.Background())
func (sm *StateManager) Start(ctx context.Context) error {
ctx, sm.cancel = context.WithCancel(ctx)
sm.shutdown = make(chan struct{})
go sm.preMigrationWorker(ctx)
return nil
Expand Down
2 changes: 1 addition & 1 deletion chain/store/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestIndexSeeks(t *testing.T) {
ctx := context.TODO()

nbs := blockstore.NewMemorySync()
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), filcns.Weight, nil)
cs := store.NewChainStore(context.Background(), nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

_, _, err = cs.Import(ctx, bytes.NewReader(gencar))
Expand Down
4 changes: 2 additions & 2 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type ChainStore struct {
wg sync.WaitGroup
}

func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, weight WeightFunc, j journal.Journal) *ChainStore {
func NewChainStore(ctx context.Context, chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, weight WeightFunc, j journal.Journal) *ChainStore {
c, _ := arc.NewARC[cid.Cid, mmCids](DefaultMsgMetaCacheSize)
tsc, _ := arc.NewARC[types.TipSetKey, *types.TipSet](DefaultTipSetCacheSize)
if j == nil {
Expand Down Expand Up @@ -191,7 +191,7 @@ func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dsto

hcmetric := func(rev, app []*types.TipSet) error {
for _, r := range app {
stats.Record(context.Background(), metrics.ChainNodeHeight.M(int64(r.Height())))
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height())))
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}

cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
cs := store.NewChainStore(context.Background(), bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

b.ResetTimer()
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestChainExportImport(t *testing.T) {
}

nbs := blockstore.NewMemorySync()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, _, err := cs.Import(context.TODO(), buf)
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestChainImportTipsetKeyCid(t *testing.T) {
}

nbs := blockstore.NewMemorySync()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, _, err := cs.Import(ctx, buf)
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestChainExportImportFull(t *testing.T) {

nbs := blockstore.NewMemorySync()
ds := datastore.NewMapDatastore()
cs := store.NewChainStore(nbs, nbs, ds, filcns.Weight, nil)
cs := store.NewChainStore(context.Background(), nbs, nbs, ds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, _, err := cs.Import(context.TODO(), buf)
Expand Down
6 changes: 5 additions & 1 deletion chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ func NewSyncer(ds dtypes.MetadataDS,
}

func (syncer *Syncer) Start() {
tickerCtx, tickerCtxCancel := context.WithCancel(context.Background())
ctx := context.Background()
ctx = metrics.AddNetworkTag(ctx)
tickerCtx, tickerCtxCancel := context.WithCancel(ctx)
syncer.syncmgr.Start()

syncer.tickerCtxCancel = tickerCtxCancel
Expand Down Expand Up @@ -205,6 +207,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
}()

ctx := context.Background()
ctx = metrics.AddNetworkTag(ctx)
if fts == nil {
log.Errorf("got nil tipset in InformNewHead")
return false
Expand Down Expand Up @@ -302,6 +305,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
blockstore := bstore.NewMemory()
cst := cbor.NewCborStore(blockstore)
ctx := context.Background()
ctx = metrics.AddNetworkTag(ctx)
var bcids, scids []cid.Cid

for _, m := range fblk.BlsMessages {
Expand Down
4 changes: 2 additions & 2 deletions chain/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type workerStatus struct {

// sync manager interface

func NewSyncManager(sync SyncFunc) SyncManager {
ctx, cancel := context.WithCancel(context.Background())
func NewSyncManager(ctx context.Context, sync SyncFunc) SyncManager {
ctx, cancel := context.WithCancel(ctx)
return &syncManager{
ctx: ctx,
cancel: cancel,
Expand Down
2 changes: 1 addition & 1 deletion chain/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type syncOp struct {

func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *syncManager, chan *syncOp)) {
syncTargets := make(chan *syncOp)
sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error {
sm := NewSyncManager(context.Background(), func(ctx context.Context, ts *types.TipSet) error {
ch := make(chan struct{})
syncTargets <- &syncOp{
ts: ts,
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-bench/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ var importBenchCmd = &cli.Command{
}

metadataDs := datastore.NewMapDatastore()
cs := store.NewChainStore(bs, bs, metadataDs, filcns.Weight, nil)
cs := store.NewChainStore(cctx.Context, bs, bs, metadataDs, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

// TODO: We need to supply the actual beacon after v14
Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-miner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var runCmd = &cli.Command{
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "miner"),
)
ctx = metrics.AddNetworkTag(ctx)
// Register all metric views
if err := view.Register(
metrics.MinerNodeViews...,
Expand Down
Loading
Loading