Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"draftlog": "^1.0.13",
"mongodb": "^6.5.0",
"pg": "^8.11.5",
"pg-format": "^1.0.4",
"sqlite3": "^5.1.7"
}
}
}
16 changes: 7 additions & 9 deletions src/background-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ const db = await getPostgresConnection()

process.on('message', (items) => {
// console.log(` ${process.pid} received ${items.length} items`,);
for (const item of items) {
db.students.insert(item)
.then(() => {
process.send('item-done');
})
.catch((error) => {
console.error(error);
});
}
db.students.insertMany(items)
.then(() => {
process.send(items.length);
})
.catch((error) => {
console.error(error);
});
});
1 change: 0 additions & 1 deletion src/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ function initializeCluster({ backgroundTaskFile, clusterSize, onMessage }) {
})

child.on('message', (message) => {
if (message !== 'item-done') return
onMessage(message)
})

Expand Down
12 changes: 12 additions & 0 deletions src/data-streaming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { StreamCache } from './stram-cache.js'
import { getMongoConnection } from './db.js'

const ITEMS_PER_PAGE = 4000

const mongoDB = await getMongoConnection()
const stream = mongoDB.students.find().stream()
const cache = new StreamCache(stream, ITEMS_PER_PAGE)

cache.stream().on('data', (data) => {
process.send(JSON.parse(data));
});
10 changes: 10 additions & 0 deletions src/db.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { MongoClient } from 'mongodb';
import pg from 'pg';
import format from 'pg-format';
const { Client } = pg;
// Connection URL for MongoDB

Expand Down Expand Up @@ -47,6 +48,15 @@ async function getPostgresConnection() {

await client.query(query, values);

},
async insertMany(persons) {
const query = format(
'INSERT INTO students (name, email, age, registered_at) VALUES %L',
persons.map((person) => [person.name, person.email, person.age, person.registered_at])
);

await client.query(query);

},
async list(limit = 100) {
const query = 'SELECT * FROM students LIMIT $1';
Expand Down
47 changes: 21 additions & 26 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,18 @@
import { initialize } from "./cluster.js"
import { getMongoConnection, getPostgresConnection } from './db.js'
import cliProgress from 'cli-progress'
import { setTimeout } from 'node:timers/promises'
import os from 'os';

const mongoDB = await getMongoConnection()
const postgresDB = await getPostgresConnection()
const ITEMS_PER_PAGE = 4000
const CLUSTER_SIZE = 99
// const ITEMS_PER_PAGE = 4000
const CLUSTER_SIZE = os.cpus().length
const TASK_FILE = new URL('./background-task.js', import.meta.url).pathname
const DATA_STREAMING_FILE = new URL('./data-streaming.js', import.meta.url).pathname

// console.log(`there was ${await postgresDB.students.count()} items on Postgres, deleting all...`)
await postgresDB.students.deleteAll()

async function* getAllPagedData(itemsPerPage, page = 0) {

const data = mongoDB.students.find().skip(page).limit(itemsPerPage)
const items = await data.toArray()
if (!items.length) return

yield items

yield* getAllPagedData(itemsPerPage, page += itemsPerPage)
}

const total = await mongoDB.students.countDocuments()
// console.log(`total items on DB: ${total}`)

Expand All @@ -37,25 +28,29 @@ const cp = initialize(
backgroundTaskFile: TASK_FILE,
clusterSize: CLUSTER_SIZE,
amountToBeProcessed: total,
async onMessage(message) {
progress.increment()
async onMessage(cumulativeProcessed) {
totalProcessed += cumulativeProcessed;
progress.update(totalProcessed);

if (++totalProcessed !== total) return
if (totalProcessed !== total) return
// console.log(`all ${amountToBeProcessed} processed! Exiting...`)
progress.stop()
cp.killAll()

const insertedOnSQLite = await postgresDB.students.count()
console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLite}`)
console.log(`are the same? ${total === insertedOnSQLite ? 'yes' : 'no'}`)
const insertedOnSQLPostGres = await postgresDB.students.count()
console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLPostGres}`)
console.log(`are the same? ${total === insertedOnSQLPostGres ? 'yes' : 'no'}`)
process.exit()

}
}
)
await setTimeout(1000)

for await (const data of getAllPagedData(ITEMS_PER_PAGE)) {
cp.sendToChild(data)
}

initialize(
{
backgroundTaskFile: DATA_STREAMING_FILE,
clusterSize: 1,
async onMessage(message) {
cp.sendToChild(message)
}
}
)
33 changes: 33 additions & 0 deletions src/stram-cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Readable } from 'stream';

export class StreamCache {
constructor(inputStream, cacheThreshold = 4000) {
this.cacheStream = new Readable({
read() { }
});
this.cache = [];
this.cacheThreshold = cacheThreshold;
inputStream.on('data', this._addDataToCache);
inputStream.on('end', () => {
this._emitCache();
this.cacheStream.emit('end');
});
}

_addDataToCache = (data) => {
this.cache.push(data);
if (this.cache.length >= this.cacheThreshold) {
this._emitCache();
}
}

_emitCache() {
if (!this.cache.length) return
this.cacheStream.push(JSON.stringify(this.cache));
this.cache = [];
}

stream() {
return this.cacheStream;
}
}