diff --git a/README.md b/README.md index e1ce82f..2d43b33 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,30 @@ smb2Client.rename('path\\to\\my\\file.txt', 'new\\path\\to\\my\\new-file-name.tx }); ``` +### smb2Client.createReadStream ( path ) +Creates a readable stream. +```javascript +var smbStream = smb2Client.createReadStream('path\\to\\my\\file.txt'); +smbStream.pipe(fs.createWriteStream('path/to/local/file.txt')): +smbStream.on('end', function () { + console.log('File copied'); +}); +``` + +Note that when using readable streams, it might make sense to disable auto closing of the SMB2 session by setting the `autoCloseTimeout`to `0`. + +### smb2Client.createWriteStream ( path ) +Creates a writeable stream. +```javascript +var smbStream = smb2Client.createWriteStream('path\\to\\my\\file.txt'); +fs.createReadStream('path/to/local/file.txt').pipe(smbStream); +smbStream.on('finish', function () { + console.log('File copied'); +}); +``` + +Note that when using writeable streams, it might make sense to disable auto closing of the SMB2 session by setting the `autoCloseTimeout`to `0`. + ### smb2Client.close ( ) This function will close the open connection if opened, it will be called automatically after ```autoCloseTimeout``` ms of no SMB2 call on the server. @@ -147,6 +171,7 @@ This function will close the open connection if opened, it will be called automa - [Fabrice Marsaud](https://github.com/marsaud) - [Jay McAliley](https://github.com/jaymcaliley) - [eldrago](https://github.com/eldrago) +- [Friðjón Guðjohnsen](https://github.com/fridjon) ## References diff --git a/lib/api/createreadstream.js b/lib/api/createreadstream.js new file mode 100644 index 0000000..4abec45 --- /dev/null +++ b/lib/api/createreadstream.js @@ -0,0 +1,99 @@ +var RS = require('readable-stream') + , SMB2Forge = require('../tools/smb2-forge') + , SMB2Request = SMB2Forge.request + , SMB2Connection = require('../tools/smb2-connection') + , bigint = require('../tools/bigint') + ; + +/* +* createReadStream +* ================ +* +* Return a read stream for a file on the share +*/ +module.exports = function(filename){ + var connection = this + , file + , bufferedChunks = [] + , readPending = false + , opened = false + , fileLength = 0 + , offset = new bigint(8) + , stop = false + , nbRemainingPackets = 0 + , maxPacketSize = 0x00010000 + , readable = new RS.Readable({ + read: function(size) { + readPending = true; + if (opened) readNext(); + } + }) + ; + + var open = SMB2Connection.requireConnect(function(cb) { + SMB2Request('open', {path:filename}, connection, cb); + }).bind(this); + + open(function(err, openedFile) { + file = openedFile; + opened = true; + if(err) { + readable.emit('error', err); + stop = true; + } + else { + opened = true; + for(var i=0;i 0 && readPending) { + readPending = readable.push(bufferedChunks.shift()); + + if (!offset.lt(fileLength) && bufferedChunks.length === 0 && + nbRemainingPackets === 0 && readPending) { + readable.push(null); + } + } + } + + return readable; +} diff --git a/lib/api/createwritestream.js b/lib/api/createwritestream.js new file mode 100644 index 0000000..dd89b40 --- /dev/null +++ b/lib/api/createwritestream.js @@ -0,0 +1,175 @@ +var RS = require('readable-stream') + , SMB2Forge = require('../tools/smb2-forge') + , SMB2Request = SMB2Forge.request + , SMB2Connection = require('../tools/smb2-connection') + , bigint = require('../tools/bigint') + ; + +/* +* createWriteStream +* ========= +* +* create and return a writeStream to a new file on the share +* +*/ + +module.exports = function(filename) { + var connection = this + , file + , currFileLength = new bigint(8, 0) + , offset = new bigint(8, 0) + , maxPacketSize = new bigint(8, 0x00010000 - 0x71) + , nbRemainingPackets = 0 + , chunkOffset = 0 + , writable = new RS.Writable({ + write: write, + writev: writev, + final: final + }) + , pendingError + , stop = false + , created = false + , writePending = false + , incomingWriteBuffer = [] + , outgoingWriteBuffer = [] + , finalCb = null + ; + + function write(chunk, encoding, cb) { + writev([{chunk: chunk, encoding: encoding}], cb); + } + + function writev(chunks, cb) { + incomingWriteBuffer.push({ chunks: chunks, cb: cb}); + writePending = true; + if (pendingError) { + stop = true; + cb(pendingError); + } else { + if (created) { + writeNext(); + } + } + } + + function final(cb) { + finalCb = cb; + writeNext(); + } + + + var createFile = SMB2Connection.requireConnect(function(cb) { + SMB2Request('create', {path:filename}, connection, cb); + }).bind(this); + + createFile(function(err, f) { + if(err) { + if (outgoingWriteBuffer.length > 0) { + outgoingWriteBuffer[0].cb(err); + stop = true; + } else { + pendingError = err; + } + } + else { + created = true; + file = f; + if (writePending) { + writeNext(); + } + } + }); + + function writeNext() { + if (outgoingWriteBuffer.length !== 0) return; + outgoingWriteBuffer = incomingWriteBuffer; + incomingWriteBuffer = []; + if (outgoingWriteBuffer.length !== 0) startOutgoingBufferWrite(); + if (outgoingWriteBuffer.length === 0 && finalCb) { + SMB2Request('close', file, connection, function(err){ + if(err) finalCb(err); + else { + file = null; + finalCb(); + } + }); + } + } + + function getLengthOfChunksInBuffer(buffer) { + return buffer.reduce(function(acc, curr) { + return acc + curr.chunks.reduce(function(acc2, curr2) { + return acc2 + curr2.chunk.length; + },0); + }, 0) + } + + function startOutgoingBufferWrite() { + currFileLength = currFileLength.add(getLengthOfChunksInBuffer(outgoingWriteBuffer)); + SMB2Request('set_info', {FileId:file.FileId, FileInfoClass:'FileEndOfFileInformation', Buffer:currFileLength.toBuffer()}, connection, function(err){ + if(err) { + stop = true; + outgoingWriteBuffer[0].cb(err); + } + else { + chunkOffset = new bigint(8, 0); + continueOutgoingBufferWrite(); + } + }); + } + + function callback(cb) { + return function(err) { + if(stop) return; + if(err) { + cb(err); + stop = true; + } else { + nbRemainingPackets--; + continueOutgoingBufferWrite(); + } + } + } + + function continueOutgoingBufferWrite() { + if (stop || outgoingWriteBuffer.length === 0) return; + var currChunk = outgoingWriteBuffer[0].chunks[0].chunk; + var currChunkLen = new bigint(8, currChunk.length); + var currCb = outgoingWriteBuffer[0].cb; + while(nbRemainingPackets= currChunk.length) { + outgoingWriteBuffer[0].chunks.shift(); + if (outgoingWriteBuffer[0].chunks.length > 0) { + chunkOffset = new bigint(8, 0); + currChunk = outgoingWriteBuffer[0].chunks[0].chunk; + currChunkLen = new bigint(8, currChunk.length); + } else { + outgoingWriteBuffer[0].cb(); + outgoingWriteBuffer.shift(); + if (outgoingWriteBuffer.length > 0) { + chunkOffset = new bigint(8, 0); + currChunk = outgoingWriteBuffer[0].chunks[0].chunk; + currChunkLen = new bigint(8, currChunk.length); + } else { + chunkOffset = new bigint(8, 0); + } + } + } + nbRemainingPackets++; + } + writeNext(); + } + + + return writable; +}; \ No newline at end of file diff --git a/lib/smb2.js b/lib/smb2.js index c2d700b..37d6002 100644 --- a/lib/smb2.js +++ b/lib/smb2.js @@ -99,6 +99,5 @@ proto.readdir = SMB2Connection.requireConnect(require('./api/readdir')); proto.rmdir = SMB2Connection.requireConnect(require('./api/rmdir')); proto.mkdir = SMB2Connection.requireConnect(require('./api/mkdir')); - - - +proto.createReadStream = require('./api/createreadstream'); +proto.createWriteStream = require('./api/createwritestream'); \ No newline at end of file diff --git a/package.json b/package.json index 72d33c8..e945f2e 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,8 @@ "url": "https://github.com/bchelli/node-smb2" }, "dependencies": { - "ntlm": "^0.1.3" + "ntlm": "^0.1.3", + "readable-stream": "^3.1.1" }, "keywords": [ "SMB", @@ -28,3 +29,4 @@ "Samba" ] } +