From be1272cd7cad9c467e0d87424dbf2e40ade6a9d9 Mon Sep 17 00:00:00 2001 From: Gareth Jones Date: Wed, 5 Sep 2012 10:58:28 +1000 Subject: [PATCH] moved streams code around, added stub for DateRollingFileStream --- lib/streams.js | 268 ------------------- lib/streams/BaseRollingFileStream.js | 91 +++++++ lib/streams/BufferedWriteStream.js | 77 ++++++ lib/streams/DateRollingFileStream.js | 16 ++ lib/streams/RollingFileStream.js | 110 ++++++++ lib/streams/index.js | 3 + test/streams/DateRollingFileStream-test.js | 49 ++++ test/{ => streams}/bufferedStream-test.js | 4 +- test/{ => streams}/rollingFileStream-test.js | 2 +- 9 files changed, 349 insertions(+), 271 deletions(-) delete mode 100644 lib/streams.js create mode 100644 lib/streams/BaseRollingFileStream.js create mode 100644 lib/streams/BufferedWriteStream.js create mode 100644 lib/streams/DateRollingFileStream.js create mode 100644 lib/streams/RollingFileStream.js create mode 100644 lib/streams/index.js create mode 100644 test/streams/DateRollingFileStream-test.js rename test/{ => streams}/bufferedStream-test.js (97%) rename test/{ => streams}/rollingFileStream-test.js (98%) diff --git a/lib/streams.js b/lib/streams.js deleted file mode 100644 index 4e80b51..0000000 --- a/lib/streams.js +++ /dev/null @@ -1,268 +0,0 @@ -var util = require('util'), -fs = require('fs'), -path = require('path'), -events = require('events'), -async = require('async'); - -function debug(message) { -// util.debug(message); -// console.log(message); -} - -function BufferedWriteStream(stream) { - var that = this; - this.stream = stream; - this.buffer = []; - this.canWrite = false; - this.bytes = 0; - - this.stream.on("open", function() { - that.canWrite = true; - that.flushBuffer(); - }); - - this.stream.on("error", function (err) { - that.emit("error", err); - }); - - this.stream.on("drain", function() { - that.canWrite = true; - that.flushBuffer(); - }); -} - -util.inherits(BufferedWriteStream, events.EventEmitter); - -Object.defineProperty( - BufferedWriteStream.prototype, - "fd", - { - get: function() { return this.stream.fd; }, - set: function(newFd) { - this.stream.fd = newFd; - this.bytes = 0; - } - } -); - -Object.defineProperty( - BufferedWriteStream.prototype, - "bytesWritten", - { - get: function() { return this.bytes; } - } -); - -BufferedWriteStream.prototype.write = function(data, encoding) { - this.buffer.push({ data: data, encoding: encoding }); - this.flushBuffer(); -}; - -BufferedWriteStream.prototype.end = function(data, encoding) { - if (data) { - this.buffer.push({ data: data, encoding: encoding }); - } - this.flushBufferEvenIfCannotWrite(); -}; - -BufferedWriteStream.prototype.writeToStream = function(toWrite) { - this.bytes += toWrite.data.length; - this.canWrite = this.stream.write(toWrite.data, toWrite.encoding); -}; - -BufferedWriteStream.prototype.flushBufferEvenIfCannotWrite = function() { - while (this.buffer.length > 0) { - this.writeToStream(this.buffer.shift()); - } -}; - -BufferedWriteStream.prototype.flushBuffer = function() { - while (this.buffer.length > 0 && this.canWrite) { - this.writeToStream(this.buffer.shift()); - } -}; - -function BaseRollingFileStream(filename, options) { - this.filename = filename; - this.options = options || { encoding: 'utf8', mode: 0644, flags: 'a' }; - this.rolling = false; - this.writesWhileRolling = []; - this.currentSize = 0; - - function currentFileSize(file) { - var fileSize = 0; - try { - fileSize = fs.statSync(file).size; - } catch (e) { - // file does not exist - } - return fileSize; - } - - function throwErrorIfArgumentsAreNotValid() { - if (!filename) { - throw new Error("You must specify a filename"); - } - } - - throwErrorIfArgumentsAreNotValid(); - - BaseRollingFileStream.super_.call(this, this.filename, this.options); - this.currentSize = currentFileSize(this.filename); -} -util.inherits(BaseRollingFileStream, fs.FileWriteStream); - -BaseRollingFileStream.prototype.initRolling = function() { - var that = this; - - function emptyRollingQueue() { - debug("emptying the rolling queue"); - var toWrite; - while ((toWrite = that.writesWhileRolling.shift())) { - BaseRollingFileStream.super_.prototype.write.call(that, toWrite.data, toWrite.encoding); - that.currentSize += toWrite.data.length; - if (that.shouldRoll()) { - that.flush(); - return true; - } - } - that.flush(); - return false; - } - - this.rolling = true; - this.roll(this.filename, function() { - that.currentSize = 0; - that.rolling = emptyRollingQueue(); - if (that.rolling) { - process.nextTick(function() { that.initRolling(); }); - } - }); -}; - -BaseRollingFileStream.prototype.write = function(data, encoding) { - if (this.rolling) { - this.writesWhileRolling.push({ data: data, encoding: encoding }); - return false; - } else { - var canWrite = BaseRollingFileStream.super_.prototype.write.call(this, data, encoding); - this.currentSize += data.length; - debug('current size = ' + this.currentSize); - if (this.shouldRoll()) { - this.initRolling(); - } - return canWrite; - } -}; - -BaseRollingFileStream.prototype.shouldRoll = function() { - return false; // default behaviour is never to roll -}; - -BaseRollingFileStream.prototype.roll = function(filename, callback) { - callback(); // default behaviour is not to do anything -}; - - -function RollingFileStream (filename, size, backups, options) { - this.size = size; - this.backups = backups || 1; - - function throwErrorIfArgumentsAreNotValid() { - if (!filename || !size || size <= 0) { - throw new Error("You must specify a filename and file size"); - } - } - - throwErrorIfArgumentsAreNotValid(); - - RollingFileStream.super_.call(this, filename, options); -} -util.inherits(RollingFileStream, BaseRollingFileStream); - -RollingFileStream.prototype.shouldRoll = function() { - return this.currentSize >= this.size; -}; - -RollingFileStream.prototype.roll = function(filename, callback) { - var that = this, - nameMatcher = new RegExp('^' + path.basename(filename)); - - function justTheseFiles (item) { - return nameMatcher.test(item); - } - - function index(filename_) { - return parseInt(filename_.substring((path.basename(filename) + '.').length), 10) || 0; - } - - function byIndex(a, b) { - if (index(a) > index(b)) { - return 1; - } else if (index(a) < index(b) ) { - return -1; - } else { - return 0; - } - } - - function increaseFileIndex (fileToRename, cb) { - var idx = index(fileToRename); - debug('Index of ' + fileToRename + ' is ' + idx); - if (idx < that.backups) { - //on windows, you can get a EEXIST error if you rename a file to an existing file - //so, we'll try to delete the file we're renaming to first - fs.unlink(filename + '.' + (idx+1), function (err) { - //ignore err: if we could not delete, it's most likely that it doesn't exist - debug('Renaming ' + fileToRename + ' -> ' + filename + '.' + (idx+1)); - fs.rename(path.join(path.dirname(filename), fileToRename), filename + '.' + (idx + 1), cb); - }); - } else { - cb(); - } - } - - function renameTheFiles(cb) { - //roll the backups (rename file.n to file.n+1, where n <= numBackups) - debug("Renaming the old files"); - fs.readdir(path.dirname(filename), function (err, files) { - async.forEachSeries( - files.filter(justTheseFiles).sort(byIndex).reverse(), - increaseFileIndex, - cb - ); - }); - } - - function openANewFile(cb) { - debug("Opening a new file"); - fs.open( - filename, - that.options.flags, - that.options.mode, - function (err, fd) { - debug("opened new file"); - var oldLogFileFD = that.fd; - that.fd = fd; - that.writable = true; - fs.close(oldLogFileFD, cb); - } - ); - } - - debug("Starting roll"); - debug("Queueing up data until we've finished rolling"); - debug("Flushing underlying stream"); - this.flush(); - - async.series([ - renameTheFiles, - openANewFile - ], callback); - -}; - - -exports.BaseRollingFileStream = BaseRollingFileStream; -exports.RollingFileStream = RollingFileStream; -exports.BufferedWriteStream = BufferedWriteStream; diff --git a/lib/streams/BaseRollingFileStream.js b/lib/streams/BaseRollingFileStream.js new file mode 100644 index 0000000..64401b2 --- /dev/null +++ b/lib/streams/BaseRollingFileStream.js @@ -0,0 +1,91 @@ +var fs = require('fs'), + util = require('util'); + +function debug(message) { +// util.debug(message); +// console.log(message); +} + +module.exports = BaseRollingFileStream; + +function BaseRollingFileStream(filename, options) { + this.filename = filename; + this.options = options || { encoding: 'utf8', mode: 0644, flags: 'a' }; + this.rolling = false; + this.writesWhileRolling = []; + this.currentSize = 0; + + function currentFileSize(file) { + var fileSize = 0; + try { + fileSize = fs.statSync(file).size; + } catch (e) { + // file does not exist + } + return fileSize; + } + + function throwErrorIfArgumentsAreNotValid() { + if (!filename) { + throw new Error("You must specify a filename"); + } + } + + throwErrorIfArgumentsAreNotValid(); + + BaseRollingFileStream.super_.call(this, this.filename, this.options); + this.currentSize = currentFileSize(this.filename); +} +util.inherits(BaseRollingFileStream, fs.FileWriteStream); + +BaseRollingFileStream.prototype.initRolling = function() { + var that = this; + + function emptyRollingQueue() { + debug("emptying the rolling queue"); + var toWrite; + while ((toWrite = that.writesWhileRolling.shift())) { + BaseRollingFileStream.super_.prototype.write.call(that, toWrite.data, toWrite.encoding); + that.currentSize += toWrite.data.length; + if (that.shouldRoll()) { + that.flush(); + return true; + } + } + that.flush(); + return false; + } + + this.rolling = true; + this.roll(this.filename, function() { + that.currentSize = 0; + that.rolling = emptyRollingQueue(); + if (that.rolling) { + process.nextTick(function() { that.initRolling(); }); + } + }); +}; + +BaseRollingFileStream.prototype.write = function(data, encoding) { + if (this.rolling) { + this.writesWhileRolling.push({ data: data, encoding: encoding }); + return false; + } else { + var canWrite = BaseRollingFileStream.super_.prototype.write.call(this, data, encoding); + this.currentSize += data.length; + debug('current size = ' + this.currentSize); + if (this.shouldRoll()) { + this.initRolling(); + } + return canWrite; + } +}; + +BaseRollingFileStream.prototype.shouldRoll = function() { + return false; // default behaviour is never to roll +}; + +BaseRollingFileStream.prototype.roll = function(filename, callback) { + callback(); // default behaviour is not to do anything +}; + diff --git a/lib/streams/BufferedWriteStream.js b/lib/streams/BufferedWriteStream.js new file mode 100644 index 0000000..d10c8c7 --- /dev/null +++ b/lib/streams/BufferedWriteStream.js @@ -0,0 +1,77 @@ +var events = require('events'), + util = require('util'); + +module.exports = BufferedWriteStream; + +function BufferedWriteStream(stream) { + var that = this; + this.stream = stream; + this.buffer = []; + this.canWrite = false; + this.bytes = 0; + + this.stream.on("open", function() { + that.canWrite = true; + that.flushBuffer(); + }); + + this.stream.on("error", function (err) { + that.emit("error", err); + }); + + this.stream.on("drain", function() { + that.canWrite = true; + that.flushBuffer(); + }); +} + +util.inherits(BufferedWriteStream, events.EventEmitter); + +Object.defineProperty( + BufferedWriteStream.prototype, + "fd", + { + get: function() { return this.stream.fd; }, + set: function(newFd) { + this.stream.fd = newFd; + this.bytes = 0; + } + } +); + +Object.defineProperty( + BufferedWriteStream.prototype, + "bytesWritten", + { + get: function() { return this.bytes; } + } +); + +BufferedWriteStream.prototype.write = function(data, encoding) { + this.buffer.push({ data: data, encoding: encoding }); + this.flushBuffer(); +}; + +BufferedWriteStream.prototype.end = function(data, encoding) { + if (data) { + this.buffer.push({ data: data, encoding: encoding }); + } + this.flushBufferEvenIfCannotWrite(); +}; + +BufferedWriteStream.prototype.writeToStream = function(toWrite) { + this.bytes += toWrite.data.length; + this.canWrite = this.stream.write(toWrite.data, toWrite.encoding); +}; + +BufferedWriteStream.prototype.flushBufferEvenIfCannotWrite = function() { + while (this.buffer.length > 0) { + this.writeToStream(this.buffer.shift()); + } +}; + +BufferedWriteStream.prototype.flushBuffer = function() { + while (this.buffer.length > 0 && this.canWrite) { + this.writeToStream(this.buffer.shift()); + } +}; diff --git a/lib/streams/DateRollingFileStream.js b/lib/streams/DateRollingFileStream.js new file mode 100644 index 0000000..abb3282 --- /dev/null +++ b/lib/streams/DateRollingFileStream.js @@ -0,0 +1,16 @@ +var BaseRollingFileStream = require('./BaseRollingFileStream'), + util = require('util'); + +module.exports = DateRollingFileStream; + +function DateRollingFileStream(filename, pattern, options) { + if (typeof(pattern) === 'object') { + options = pattern; + pattern = null; + } + this.pattern = pattern || 'yyyy-mm-dd'; + + DateRollingFileStream.super_.call(this, filename, options); +} + +util.inherits(DateRollingFileStream, BaseRollingFileStream); diff --git a/lib/streams/RollingFileStream.js b/lib/streams/RollingFileStream.js new file mode 100644 index 0000000..b8ea244 --- /dev/null +++ b/lib/streams/RollingFileStream.js @@ -0,0 +1,110 @@ +var BaseRollingFileStream = require('./BaseRollingFileStream'), + util = require('util'), + path = require('path'), + fs = require('fs'), + async = require('async'); + +function debug(message) { +// util.debug(message); +// console.log(message); +} + +module.exports = RollingFileStream; + +function RollingFileStream (filename, size, backups, options) { + this.size = size; + this.backups = backups || 1; + + function throwErrorIfArgumentsAreNotValid() { + if (!filename || !size || size <= 0) { + throw new Error("You must specify a filename and file size"); + } + } + + throwErrorIfArgumentsAreNotValid(); + + RollingFileStream.super_.call(this, filename, options); +} +util.inherits(RollingFileStream, BaseRollingFileStream); + +RollingFileStream.prototype.shouldRoll = function() { + return this.currentSize >= this.size; +}; + +RollingFileStream.prototype.roll = function(filename, callback) { + var that = this, + nameMatcher = new RegExp('^' + path.basename(filename)); + + function justTheseFiles (item) { + return nameMatcher.test(item); + } + + function index(filename_) { + return parseInt(filename_.substring((path.basename(filename) + '.').length), 10) || 0; + } + + function byIndex(a, b) { + if (index(a) > index(b)) { + return 1; + } else if (index(a) < index(b) ) { + return -1; + } else { + return 0; + } + } + + function increaseFileIndex (fileToRename, cb) { + var idx = index(fileToRename); + debug('Index of ' + fileToRename + ' is ' + idx); + if (idx < that.backups) { + //on windows, you can get a EEXIST error if you rename a file to an existing file + //so, we'll try to delete the file we're renaming to first + fs.unlink(filename + '.' + (idx+1), function (err) { + //ignore err: if we could not delete, it's most likely that it doesn't exist + debug('Renaming ' + fileToRename + ' -> ' + filename + '.' + (idx+1)); + fs.rename(path.join(path.dirname(filename), fileToRename), filename + '.' + (idx + 1), cb); + }); + } else { + cb(); + } + } + + function renameTheFiles(cb) { + //roll the backups (rename file.n to file.n+1, where n <= numBackups) + debug("Renaming the old files"); + fs.readdir(path.dirname(filename), function (err, files) { + async.forEachSeries( + files.filter(justTheseFiles).sort(byIndex).reverse(), + increaseFileIndex, + cb + ); + }); + } + + function openANewFile(cb) { + debug("Opening a new file"); + fs.open( + filename, + that.options.flags, + that.options.mode, + function (err, fd) { + debug("opened new file"); + var oldLogFileFD = that.fd; + that.fd = fd; + that.writable = true; + fs.close(oldLogFileFD, cb); + } + ); + } + + debug("Starting roll"); + debug("Queueing up data until we've finished rolling"); + debug("Flushing underlying stream"); + this.flush(); + + async.series([ + renameTheFiles, + openANewFile + ], callback); + +}; diff --git a/lib/streams/index.js b/lib/streams/index.js new file mode 100644 index 0000000..7f6b043 --- /dev/null +++ b/lib/streams/index.js @@ -0,0 +1,3 @@ +exports.BufferedWriteStream = require('./BufferedWriteStream'); +exports.RollingFileStream = require('./RollingFileStream'); +exports.DateRollingFileStream = require('./DateRollingFileStream'); diff --git a/test/streams/DateRollingFileStream-test.js b/test/streams/DateRollingFileStream-test.js new file mode 100644 index 0000000..b32ffbf --- /dev/null +++ b/test/streams/DateRollingFileStream-test.js @@ -0,0 +1,49 @@ +var vows = require('vows'), + assert = require('assert'), + fs = require('fs'), + DateRollingFileStream = require('../../lib/streams').DateRollingFileStream; + +vows.describe('DateRollingFileStream').addBatch({ + 'arguments': { + topic: new DateRollingFileStream('test-date-rolling-file-stream', 'yyyy-mm-dd.hh'), + + 'should take a filename and a pattern and return a FileWriteStream': function(stream) { + assert.equal(stream.filename, 'test-date-rolling-file-stream'); + assert.equal(stream.pattern, 'yyyy-mm-dd.hh'); + assert.instanceOf(stream, fs.FileWriteStream); + }, + 'with default settings for the underlying stream': function(stream) { + assert.equal(stream.mode, 420); + assert.equal(stream.flags, 'a'); + assert.equal(stream.encoding, 'utf8'); + } + }, + + 'default arguments': { + topic: new DateRollingFileStream('test-date-rolling-file-stream'), + + 'pattern should be yyyy-mm-dd': function(stream) { + assert.equal(stream.pattern, 'yyyy-mm-dd'); + } + }, + + 'with stream arguments': { + topic: new DateRollingFileStream('test-rolling-file-stream', 'yyyy-mm-dd', { mode: 0666 }), + + 'should pass them to the underlying stream': function(stream) { + assert.equal(stream.mode, 0666); + } + }, + + 'with stream arguments but no pattern': { + topic: new DateRollingFileStream('test-rolling-file-stream', { mode: 0666 }), + + 'should pass them to the underlying stream': function(stream) { + assert.equal(stream.mode, 0666); + }, + 'should use default pattern': function(stream) { + assert.equal(stream.pattern, 'yyyy-mm-dd'); + } + } + +}).exportTo(module); diff --git a/test/bufferedStream-test.js b/test/streams/bufferedStream-test.js similarity index 97% rename from test/bufferedStream-test.js rename to test/streams/bufferedStream-test.js index 4755b28..03dd789 100644 --- a/test/bufferedStream-test.js +++ b/test/streams/bufferedStream-test.js @@ -1,7 +1,7 @@ var vows = require('vows') , assert = require('assert') , events = require('events') -, BufferedWriteStream = require('../lib/streams').BufferedWriteStream; +, BufferedWriteStream = require('../../lib/streams').BufferedWriteStream; function FakeStream() { this.writes = []; @@ -127,4 +127,4 @@ vows.describe('BufferedWriteStream').addBatch({ } } -}).exportTo(module); \ No newline at end of file +}).exportTo(module); diff --git a/test/rollingFileStream-test.js b/test/streams/rollingFileStream-test.js similarity index 98% rename from test/rollingFileStream-test.js rename to test/streams/rollingFileStream-test.js index 2ce6b77..a84c3c0 100644 --- a/test/rollingFileStream-test.js +++ b/test/streams/rollingFileStream-test.js @@ -2,7 +2,7 @@ var vows = require('vows') , assert = require('assert') , events = require('events') , fs = require('fs') -, RollingFileStream = require('../lib/streams').RollingFileStream; +, RollingFileStream = require('../../lib/streams').RollingFileStream; function remove(filename) { try {