Skip to content

Commit

Permalink
support offset in pull
Browse files Browse the repository at this point in the history
  • Loading branch information
Fishrock123 committed Jun 6, 2019
1 parent ae0a395 commit 426473c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
14 changes: 8 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class FileSource {
this.sink = sink
}

pull (error, buffer) {
pull (error, buffer, offset) {
if (error) {
if (typeof this.fd === 'number') {
fs.close(this.fd, (closeError) => {
Expand All @@ -76,27 +76,29 @@ class FileSource {

this.fd = fd

this.readFromFile(buffer)
this.readFromFile(buffer, offset)
})
} else {
this.readFromFile(buffer)
this.readFromFile(buffer, offset)
}
}

readFromFile (buffer) {
readFromFile (buffer, offset) {
if (typeof this.fd !== 'number') {
return this.pull(null, buffer)
}

fs.read(this.fd, buffer, 0, buffer.length, this.pos, (error, bytesRead) => {
const pos = offset !== undefined ? offset : this.pos

fs.read(this.fd, buffer, 0, buffer.length, pos, (error, bytesRead) => {
if (error) {
fs.close(this.fd, (closeError) => {
this.fd = null
this.sink.next(status.error, closeError || error)
})
} else {
if (bytesRead > 0) {
this.pos += bytesRead;
if (offset === undefined) this.pos += bytesRead;
this.sink.next(status.continue, null, buffer, bytesRead)
} else {
fs.close(this.fd, (closeError) => {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "A file system source for the BOB streaming protocol.",
"main": "index.js",
"scripts": {
"test": "node ./test/test-basic.js"
"test": "node ./test/test-basic.js && node ./test/test-offset.js"
},
"repository": {
"type": "git",
Expand Down
8 changes: 5 additions & 3 deletions test/assertion-helper-sink.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ const assert = require('assert').strict
const status_type = require('bob-status')

class AssertionSink {
constructor (assertions_queue) {
constructor (assertions_queue, options = {}) {
this.source = null
this.bindCb = null

this.offset = options.offset

this._queue_index = 0
this._assertions_queue = assertions_queue
}
Expand All @@ -19,7 +21,7 @@ class AssertionSink {

this.source.bindSink(this)

this.source.pull(null, Buffer.alloc(this._assertions_queue[this._queue_index].length))
this.source.pull(null, Buffer.alloc(this._assertions_queue[this._queue_index].length), this.offset)
}

next (status, error, buffer, bytes) {
Expand All @@ -33,7 +35,7 @@ class AssertionSink {
return this.source.pull(null, Buffer.alloc(0))
}

this.source.pull(null, Buffer.alloc(this._assertions_queue[this._queue_index].length))
this.source.pull(null, Buffer.alloc(this._assertions_queue[this._queue_index].length), this.offset)
}
}

Expand Down
24 changes: 24 additions & 0 deletions test/test-offset.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const FileSource = require('../index.js')
const AssertionSink = require('./assertion-helper-sink')

const fileSource = new FileSource('./test/fixtures/file')
const assertionSink = new AssertionSink(
[
'Chunk two\ndata data\n\n',
'Chunk two\ndata data\n\n',
'Chunk two\ndata data\n\n'
],
{
offset: 'Chunk one\ndata data\n\n'.length
}
)

assertionSink.bindSource(fileSource, error => {
if (error)
console.error('Stream returned ->', error.stack)
else {
console.log('ok')
}
})

0 comments on commit 426473c

Please sign in to comment.