-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharchive.mjs
127 lines (108 loc) · 3.24 KB
/
archive.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import process from "process";
import { createWriteStream } from "fs";
import dateformat from "dateformat";
import stream from "stream";
import prettyBytes from "pretty-bytes";
import { ZSTDCompress } from "simple-zstd";
import ioredis from "ioredis";
// General
const MAX_FILE_SIZE = 1024 * 1024 * 256; // 256MB log files.
const LAST_KEY = "snakebin:last";
var running = true;
// Setup Redis.
const local_client = new ioredis();
// Create stream.
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
let last_item = (await local_client.get(LAST_KEY)) || "$";
let worker = async () => {
// Stats state
let bytes = 0;
let processed = 0;
// Streams for the file pipeline..
let compressor_stream = ZSTDCompress(8);
let output_file = createWriteStream(
"output/" +
Math.floor(new Date().getTime() / 1000) +
"-logs-" +
dateformat(new Date(), "yyyy-mm-dd-HH-MM-ss") +
".txt.zst"
);
let read_stream = new stream.Readable({
read: async (size) => {
let bytes_get = 0;
// Stop attempting to read more from Redis if the output file is larger than the max.
if (output_file.bytesWritten > MAX_FILE_SIZE) return;
// We are willing to query Redis 3 times before giving up the data to the compressor.
while (bytes_get < size) {
let read_response;
// Attempt to fetch items.
try {
read_response = await local_client.xread(
"BLOCK",
"10000",
"COUNT",
"250",
"STREAMS",
"socket:snakebin",
last_item
);
console.log(last_item);
await delay(1000);
} catch (e) {
console.error(e);
continue;
}
// Retry if we were unable to get more items.
if (!read_response) {
continue;
}
let items = read_response[0][1];
for (let item of items) {
last_item = item[0];
let payload = item[1][1];
// Add the item to the file.
read_stream.push(payload);
read_stream.push("\n");
// Update the stat keeping variables first.
bytes += payload.length;
bytes_get += payload.length;
processed += 1;
}
// Update the last item pointer stored in Redis.
await local_client.set(LAST_KEY, last_item);
}
},
});
compressor_stream.on("end", () => {
console.log("ended compressor");
output_file.end();
});
// Exit the process if we are exitting and all writes are complete.
output_file.on("finish", () => {
if (!running) process.exit();
});
// Start the stream.
console.log("starting pipe to file");
read_stream.pipe(compressor_stream).pipe(output_file);
while (running && output_file.bytesWritten < MAX_FILE_SIZE) {
await delay(1000);
console.log(
`${prettyBytes(bytes)} bytes read, ${prettyBytes(
output_file.bytesWritten
)} bytes written, ${processed} processed`
);
}
console.log("loop done, ending");
compressor_stream.end();
};
let main = async () => {
await worker();
if (running) {
setTimeout(main, 1000);
}
};
process.on("SIGINT", function () {
console.log("Caught interrupt signal");
running = false;
});
main();