From d3a1169cd32d3a325455fb6718edf0054fb9c5ed Mon Sep 17 00:00:00 2001 From: Nico Krause Date: Thu, 23 Jan 2025 16:43:43 +0500 Subject: [PATCH] fix: cleaning up getOrCreateDB --- relay/package.json | 2 +- relay/src/pinner/nameOpsFileManager.js | 9 +++++---- relay/src/pinner/pinningService.js | 2 ++ relay/src/pinner/scanBlockchainForNameOps.js | 17 +++++++++-------- relay/src/pubsubHandler.js | 4 ++-- relay/src/relay.js | 9 +++++---- 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/relay/package.json b/relay/package.json index 54071b8..978ffe0 100644 --- a/relay/package.json +++ b/relay/package.json @@ -1,6 +1,6 @@ { "name": "libp2p-relay", - "version": "0.12.62", + "version": "0.12.63", "private": true, "scripts": { "start:no-restart": "node src/relay.js", diff --git a/relay/src/pinner/nameOpsFileManager.js b/relay/src/pinner/nameOpsFileManager.js index 7acca95..065776f 100644 --- a/relay/src/pinner/nameOpsFileManager.js +++ b/relay/src/pinner/nameOpsFileManager.js @@ -8,6 +8,7 @@ import PQueue from 'p-queue'; dotenv.config(); let db = null; +let orbitdb = null; let isClosing = false; const dbType = process.env.DB_TYPE || 'leveldb'; // Default to OrbitDB @@ -91,14 +92,14 @@ class LevelDBInterface { } } -export async function getOrCreateDB(orbitdb) { - console.log("getOrCreateDB", db); - if (db && db.isOpen()) { +export async function getOrCreateDB(_orbitdb) { + console.log("getOrCreateDB", db?.name); + if (db) { return db; } if (dbType === 'orbitdb') { - const orbitdb = new OrbitDBInterface(orbitdb); + const orbitdb = new OrbitDBInterface(_orbitdb); console.log("orbitdb", orbitdb); db = await orbitdb.open(); console.log("db", db); diff --git a/relay/src/pinner/pinningService.js b/relay/src/pinner/pinningService.js index 924c18d..6c7e8bb 100644 --- a/relay/src/pinner/pinningService.js +++ b/relay/src/pinner/pinningService.js @@ -12,6 +12,7 @@ const MIN_FEE = 1000000; // 0.01 DOI in swartz export class PinningService { constructor(helia, orbitdb, electrumClient) { + console.log("PinningService initialized"); this.helia = helia; this.orbitdb = orbitdb; this.electrumClient = electrumClient; @@ -28,6 +29,7 @@ export class PinningService { } async initializeDatabase() { + console.log("PinningService initializeDatabase"); // If already initialized, return existing instance if (this.db) return this.db; diff --git a/relay/src/pinner/scanBlockchainForNameOps.js b/relay/src/pinner/scanBlockchainForNameOps.js index 0b995df..7a8d6f0 100644 --- a/relay/src/pinner/scanBlockchainForNameOps.js +++ b/relay/src/pinner/scanBlockchainForNameOps.js @@ -21,11 +21,11 @@ export async function scanBlockchainForNameOps( helia, orbitdb, tip, + pinningService, _stopToken ) { try { - - pinningService = new PinningService(helia, orbitdb, electrumClient); + console.log("scanBlockchainForNameOps"); stopToken.isStopped = _stopToken; console.info( @@ -64,6 +64,7 @@ export async function scanBlockchainForNameOps( tip, state, orbitdb, + pinningService, stopToken ); } finally { @@ -79,21 +80,20 @@ async function processBlocks( tip, origState, orbitdb, + pinningService, stopToken ) { const MIN_HEIGHT = 0; let currentDay = null; - let state = null; const pinQueue = new PQueue({ concurrency: 5 }); const dbQueue = new PQueue({ concurrency: 1 }); for (let height = startHeight; height > MIN_HEIGHT; height--) { if (stopToken.isStopped) break; try { - console.info(`Processing block at height ${height}`); const { nameOpUtxos, blockDate, electrumClient: _electrumClient } = await processBlockAtHeight(height, electrumClient) electrumClient = _electrumClient - console.info(`nameOpUtxos ${nameOpUtxos} at ${blockDate}`); + // console.info(`nameOpUtxos ${nameOpUtxos} at ${blockDate}`); const blockDay = moment.utc(blockDate).format('YYYY-MM-DD'); if (blockDay !== currentDay) { currentDay = blockDay; @@ -122,7 +122,8 @@ async function processBlocks( helia, nameOp, nameOp.nameId, - sanitizedValue + sanitizedValue, + pinningService ) ); } else { @@ -134,7 +135,7 @@ async function processBlocks( // Update scanning state every 10 blocks or when reaching the stored tip height if (height % 10 === 0 || !origState || height === origState?.tipHeight) { console.info(`Updating scanning state at height ${height}`); - state = await updateScanningState({ + await updateScanningState({ lastBlockHeight: height, tipHeight: tip.height, }); @@ -189,7 +190,7 @@ async function reconnectElectrumClient() { } } -async function pinIpfsContent(electrumClient, helia, nameOp, nameId, ipfsUrl) { +async function pinIpfsContent(electrumClient, helia, nameOp, nameId, ipfsUrl, pinningService) { if (!ipfsUrl.startsWith('ipfs://')) return; const cid = ipfsUrl.replace('ipfs://', ''); diff --git a/relay/src/pubsubHandler.js b/relay/src/pubsubHandler.js index 726eee1..3faf76b 100644 --- a/relay/src/pubsubHandler.js +++ b/relay/src/pubsubHandler.js @@ -29,9 +29,9 @@ export function setupPubsub( electrumClient, fsHelia, contentTopic, - tipWatcher + tipWatcher, + pinningService ) { - const pinningService = new PinningService(helia, orbitdb, electrumClient); helia.libp2p.services.pubsub.subscribe(contentTopic); helia.libp2p.services.pubsub.subscribe("doichain._peer-discovery._p2p._pubsub") diff --git a/relay/src/relay.js b/relay/src/relay.js index 9e2034e..6feccc5 100644 --- a/relay/src/relay.js +++ b/relay/src/relay.js @@ -93,7 +93,7 @@ logger.info('Helia and OrbitDB are running'); const fsHelia = unixfs(helia); const tipWatcher = new TipWatcher(electrumClient); - +const pinningService = new PinningService(helia, orbitdb, electrumClient); setupPubsub( helia, orbitdb, @@ -101,10 +101,11 @@ setupPubsub( fsHelia, CONTENT_TOPIC, tipWatcher, + pinningService ); // createHttpServr(helia, orbitdb, electrumClient, tipWatcher); -const pinningService = new PinningService(helia, orbitdb, electrumClient); + let isScanning = false; tipWatcher.on('newTip', async (tip) => { @@ -117,7 +118,7 @@ tipWatcher.on('newTip', async (tip) => { } isScanning = true; - await scanBlockchainForNameOps(electrumClient, helia, orbitdb, tip); + await scanBlockchainForNameOps(electrumClient, helia, orbitdb, tip, pinningService); isScanning = false; // Then check for expired pins @@ -164,7 +165,7 @@ if (!argv['disable-scanning']) { logger.info('Scan already in progress, skipping initial scan'); } else { isScanning = true; - await scanBlockchainForNameOps(electrumClient, helia, orbitdb); + await scanBlockchainForNameOps(electrumClient, helia, orbitdb, null, pinningService); isScanning = false; } } else {