@@ -8,11 +8,21 @@ import { Readable } from "stream";
88import { appendFile , writeFile } from "fs/promises" ;
99import { readFileSync } from "fs" ;
1010
11+ function readFromStream ( stream : Readable ) : Promise < string > {
12+ return new Promise < string > ( ( resolve , reject ) => {
13+ const chunks : Buffer [ ] = [ ] ;
14+ stream
15+ . on ( "data" , chunk => chunks . push ( chunk ) )
16+ . on ( "end" , ( ) => resolve ( Buffer . concat ( chunks ) . toString ( "utf8" ) ) )
17+ . on ( "error" , reject ) ;
18+ } ) ;
19+ }
20+
21+ const fixturePath = "./tests/logs.fixture.log" ;
22+ const fixtureContents = readFileSync ( fixturePath , "utf8" ) ;
23+
1124describe ( "logs-stream" , ( ) => {
12- const fixturePath = "./tests/logs.fixture.log" ;
1325 describe ( "createFollowReadStream()" , ( ) => {
14- const fixtureContents = readFileSync ( fixturePath , "utf8" ) ;
15-
1626 it ( "should load all of the data from the file" , async ( ) => {
1727 const logStream = createFollowReadStream ( fixturePath ) ;
1828
@@ -36,36 +46,39 @@ describe("logs-stream", () => {
3646
3747 const logStream = createFollowReadStream ( fixturePath ) ;
3848
39- // don't `await`, because we need to write the file back to it's original state
40- const loadingLogs = await new Promise < string > ( ( resolve , reject ) => {
41- const logLines : Buffer [ ] = [ ] ;
49+ try {
50+ const logsReadAfterEOF = await new Promise < string > (
51+ ( resolve , reject ) => {
52+ const logLines : Buffer [ ] = [ ] ;
4253
43- logStream
44- // start reading log lines immediately, otherwise the file contents are buffered
45- . on ( "data" , ( ) => { } )
46- . once ( "eof" , ( ) => {
4754 logStream
48- . on ( "data" , data => logLines . push ( data ) )
55+ // start reading log lines immediately, otherwise the file contents are buffered
56+ . on ( "data" , ( ) => { } )
57+ // we wait until eof, so that we can ignore everything that's already in the file
4958 . once ( "eof" , ( ) => {
50- logStream . destroy ( ) ;
51- const logs = Buffer . concat ( logLines ) . toString ( "utf8" ) ;
52- resolve ( logs ) ;
53- } ) ;
54- appendFile ( fixturePath , newLogLine ) ;
55- } )
56- . on ( "error" , reject ) ;
57- } ) ;
59+ logStream
60+ . on ( "data" , data => logLines . push ( data ) )
61+ . once ( "eof" , ( ) => {
62+ const logs = Buffer . concat ( logLines ) . toString ( "utf8" ) ;
63+ resolve ( logs ) ;
64+ } ) ;
65+ appendFile ( fixturePath , newLogLine ) ;
66+ } )
67+ . on ( "error" , reject ) ;
68+ }
69+ ) ;
5870
59- try {
60- assert . deepStrictEqual ( await loadingLogs , newLogLine ) ;
71+ assert . deepStrictEqual ( logsReadAfterEOF , newLogLine ) ;
6172 } finally {
73+ logStream . destroy ( ) ;
74+ // rewrite the contents back to the fixture file, removing the additional data that we appended
6275 writeFile ( fixturePath , fixtureContents ) ;
6376 }
6477 } ) ;
6578 } ) ;
6679
6780 describe ( "filterLogsStream()" , ( ) => {
68- // First log stamped at epoch
81+ // First log stamped at " epoch"
6982 const epoch = Date . parse ( "2020-01-01 00:00:00 UTC" ) ;
7083 // subsequent logs are each incremented by 1 minute
7184 const timestampFromLineNumber = i => epoch + i * 60000 ;
@@ -91,24 +104,16 @@ describe("logs-stream", () => {
91104 const since = timestampFromLineNumber ( logLinesToSkip ) ;
92105
93106 const input = Readable . from ( logLines ) ;
94- const expected = Buffer . from (
95- logLines . slice ( logLinesToSkip ) . join ( "" ) ,
96- "utf8"
97- ) ;
107+ const expected = logLines . slice ( logLinesToSkip ) . join ( "" ) ;
98108
99109 const filteredStream = filterLogsStream ( { input, since } ) ;
100110
101- const result = await new Promise < Buffer > ( ( resolve , reject ) => {
102- const chunks : Buffer [ ] = [ ] ;
103- filteredStream
104- . on ( "data" , chunk => chunks . push ( chunk ) )
105- . on ( "end" , ( ) => resolve ( Buffer . concat ( chunks ) ) )
106- . on ( "error" , reject ) ;
107- } ) ;
111+ const result = await readFromStream ( filteredStream ) ;
108112
109- assert (
110- result . equals ( expected ) ,
111- `filterLogsStream() didn't correctly skip first ${ logLinesToSkip } lines from the input log stream. Expected ${ expected . length } bytes. Got ${ result . length } bytes`
113+ assert . strictEqual (
114+ result ,
115+ expected ,
116+ `filterLogsStream() didn't correctly skip first ${ logLinesToSkip } lines from the input log stream. Expected ${ expected . length } bytes. Got ${ result . length } bytes.`
112117 ) ;
113118 } ) ;
114119
@@ -118,31 +123,161 @@ describe("logs-stream", () => {
118123 const until = timestampFromLineNumber ( logLinesToReturn - 1 ) ;
119124
120125 const input = Readable . from ( logLines ) ;
121- const expected = Buffer . from (
122- logLines . slice ( 0 , logLinesToReturn ) . join ( "" ) ,
123- "utf8"
124- ) ;
126+ const expected = logLines . slice ( 0 , logLinesToReturn ) . join ( "" ) ;
125127
126128 const filteredStream = filterLogsStream ( { input, until } ) ;
129+ const result = await readFromStream ( filteredStream ) ;
127130
128- const result = await new Promise < Buffer > ( ( resolve , reject ) => {
129- const chunks : Buffer [ ] = [ ] ;
130- filteredStream
131- . on ( "data" , chunk => chunks . push ( chunk ) )
132- . on ( "end" , ( ) => resolve ( Buffer . concat ( chunks ) ) )
133- . on ( "error" , reject ) ;
134- } ) ;
135-
136- assert (
137- result . equals ( expected ) ,
138- `filterLogsStream() didn't correctly return first ${ logLinesToReturn } lines from the input log stream. Expected ${ expected . length } bytes. Got ${ result . length } bytes`
131+ assert . strictEqual (
132+ result ,
133+ expected ,
134+ `filterLogsStream() didn't correctly return first ${ logLinesToReturn } lines from the input log stream. Expected ${ expected . length } bytes. Got ${ result . length } bytes.`
139135 ) ;
140136 } ) ;
141137 } ) ;
142138
143139 describe ( "getLogsStream()" , ( ) => {
144- it ( "must be tested" , ( ) => {
145- throw new Error ( "todo: implement getLogsStream() tests" ) ;
140+ it ( "should read the specified file" , async ( ) => {
141+ const logsStream = getLogsStream ( fixturePath , { } ) ;
142+ const result = await readFromStream ( logsStream ) ;
143+ logsStream . destroy ( ) ;
144+
145+ assert . strictEqual ( result , fixtureContents ) ;
146+ } ) ;
147+
148+ it ( "should filter the specified date range" , async ( ) => {
149+ const fixtureLines = fixtureContents . split ( "\n" ) ;
150+ const skipFromFront = 2 ;
151+ const skipFromBack = 2 ;
152+
153+ const matchingLines = fixtureLines . slice (
154+ skipFromFront ,
155+ fixtureLines . length - skipFromBack - 1 // -1 because 0-based index
156+ ) ;
157+
158+ const since = Date . parse ( matchingLines [ 0 ] . slice ( 0 , 24 ) ) ;
159+ const until = Date . parse (
160+ matchingLines [ matchingLines . length - 1 ] . slice ( 0 , 24 )
161+ ) ;
162+
163+ const logsStream = getLogsStream ( fixturePath , {
164+ since,
165+ until
166+ } ) ;
167+
168+ const result = await readFromStream ( logsStream ) ;
169+ logsStream . destroy ( ) ;
170+
171+ assert . strictEqual (
172+ result ,
173+ matchingLines . join ( "\n" ) + "\n" ,
174+ `expected only long lines since ${ new Date (
175+ since
176+ ) . toISOString ( ) } and until ${ new Date ( until ) . toISOString ( ) } `
177+ ) ;
178+ } ) ;
179+
180+ it ( "should follow the specified file" , async ( ) => {
181+ const newLogLine = `${ new Date ( ) . toISOString ( ) } new log line\n` ;
182+
183+ const logStream = getLogsStream ( fixturePath , {
184+ follow : true
185+ } ) ;
186+
187+ try {
188+ const logsReadAfterEOF = await new Promise < string > (
189+ ( resolve , reject ) => {
190+ const logLines : Buffer [ ] = [ ] ;
191+
192+ logStream
193+ // start reading log lines immediately, otherwise the file contents are buffered
194+ . on ( "data" , ( ) => { } )
195+ // we wait until eof, so that we can ignore everything that's already in the file
196+ . once ( "eof" , ( ) => {
197+ logStream
198+ . on ( "data" , data => logLines . push ( data ) )
199+ . once ( "eof" , ( ) => {
200+ const logs = Buffer . concat ( logLines ) . toString ( "utf8" ) ;
201+ resolve ( logs ) ;
202+ } ) ;
203+ appendFile ( fixturePath , newLogLine ) ;
204+ } )
205+ . on ( "error" , reject ) ;
206+ }
207+ ) ;
208+
209+ assert . deepStrictEqual ( logsReadAfterEOF , newLogLine ) ;
210+ } finally {
211+ logStream . destroy ( ) ;
212+ // rewrite the contents back to the fixture file, removing the additional data that we appended
213+ writeFile ( fixturePath , fixtureContents ) ;
214+ }
215+ } ) ;
216+
217+ it ( "should follow the specified file, returning the filtered results" , async ( ) => {
218+ const fixtureLines = fixtureContents . split ( "\n" ) ;
219+ const skipFromFront = 2 ;
220+ const skipFromBack = 2 ;
221+
222+ const matchingLines = fixtureLines . slice (
223+ skipFromFront ,
224+ fixtureLines . length - skipFromBack - 1 // -1 because 0-based index
225+ ) ;
226+
227+ const since = Date . parse ( matchingLines [ 0 ] . slice ( 0 , 24 ) ) ;
228+ const until = Date . parse (
229+ matchingLines [ matchingLines . length - 1 ] . slice ( 0 , 24 )
230+ ) ;
231+
232+ const tooEarlyLogLine = `${ new Date (
233+ since - 10
234+ ) . toISOString ( ) } non-matching log line\n`;
235+
236+ const matchingLogLine = `${ new Date (
237+ since
238+ ) . toISOString ( ) } matching log line\n`;
239+
240+ const tooLateLogLine = `${ new Date (
241+ until + 10
242+ ) . toISOString ( ) } non-matching log line\n`;
243+
244+ const logStream = getLogsStream ( fixturePath , {
245+ since,
246+ until,
247+ follow : true
248+ } ) ;
249+
250+ try {
251+ const logsReadAfterEOF = await new Promise < string > (
252+ ( resolve , reject ) => {
253+ const logLines : Buffer [ ] = [ ] ;
254+
255+ logStream
256+ // start reading log lines immediately, otherwise the file contents are buffered
257+ . on ( "data" , ( ) => { } )
258+ // we wait until eof, so that we can ignore everything that's already in the file
259+ . once ( "eof" , ( ) => {
260+ logStream
261+ . on ( "data" , data => logLines . push ( data ) )
262+ . once ( "eof" , ( ) => {
263+ const logs = Buffer . concat ( logLines ) . toString ( "utf8" ) ;
264+ resolve ( logs ) ;
265+ } ) ;
266+ appendFile (
267+ fixturePath ,
268+ [ tooEarlyLogLine , matchingLogLine , tooLateLogLine ] . join ( "\n" )
269+ ) ;
270+ } )
271+ . on ( "error" , reject ) ;
272+ }
273+ ) ;
274+
275+ assert . deepStrictEqual ( logsReadAfterEOF , matchingLogLine ) ;
276+ } finally {
277+ logStream . destroy ( ) ;
278+ // rewrite the contents back to the fixture file, removing the additional data that we appended
279+ writeFile ( fixturePath , fixtureContents ) ;
280+ }
146281 } ) ;
147282 } ) ;
148283} ) ;
0 commit comments