From 05c4c59c2044592bff6b4460da8ebfa9bfbfec0e Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Thu, 22 Dec 2011 14:36:30 +1100 Subject: [PATCH] Refactored streams to make it easier to write other rolling based file appenders. --- lib/streams.js | 194 ++++++++++++++++++++++++++++++------------------- 1 file changed, 119 insertions(+), 75 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index 26165d5..8ead2b6 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -1,8 +1,13 @@ -var util = require('util') -, fs = require('fs') -, path = require('path') -, events = require('events') -, async = require('async'); +var util = require('util'), +fs = require('fs'), +path = require('path'), +events = require('events'), +async = require('async'); + +function debug(message) { +// util.debug(message); + console.log(message); +} function BufferedWriteStream(stream) { var that = this; @@ -51,89 +56,144 @@ Object.defineProperty( BufferedWriteStream.prototype.write = function(data, encoding) { this.buffer.push({ data: data, encoding: encoding }); this.flushBuffer(); -} +}; BufferedWriteStream.prototype.end = function(data, encoding) { if (data) { this.buffer.push({ data: data, encoding: encoding }); } this.flushBufferEvenIfCannotWrite(); -} +}; BufferedWriteStream.prototype.writeToStream = function(toWrite) { this.bytes += toWrite.data.length; this.canWrite = this.stream.write(toWrite.data, toWrite.encoding); -} +}; BufferedWriteStream.prototype.flushBufferEvenIfCannotWrite = function() { while (this.buffer.length > 0) { this.writeToStream(this.buffer.shift()); } -} +}; BufferedWriteStream.prototype.flushBuffer = function() { while (this.buffer.length > 0 && this.canWrite) { this.writeToStream(this.buffer.shift()); } -} +}; -function RollingFileStream (filename, size, backups, options) { +function BaseRollingFileStream(filename, options) { this.filename = filename; - this.size = size; - this.backups = backups || 1; - this.options = options || { encoding: "utf8", mode: 0644, flags: 'a' }; + this.options = options || { encoding: 'utf8', mode: 0644, flags: 'a' }; this.rolling = false; this.writesWhileRolling = []; - this.bytesQueued = 0; - - throwErrorIfArgumentsAreNotValid(); - - RollingFileStream.super_.call(this, this.filename, this.options); - this.bytesQueued = currentFileSize(this.filename); + this.currentSize = 0; function currentFileSize(file) { var fileSize = 0; try { fileSize = fs.statSync(file).size; } catch (e) { - //file does not exist + // file does not exist } return fileSize; } + function throwErrorIfArgumentsAreNotValid() { + if (!filename) { + throw new Error("You must specify a filename"); + } + } + + throwErrorIfArgumentsAreNotValid(); + + BaseRollingFileStream.super_.call(this, this.filename, this.options); + this.currentSize = currentFileSize(this.filename); +} +util.inherits(BaseRollingFileStream, fs.FileWriteStream); + +BaseRollingFileStream.prototype.initRolling = function() { + var that = this; + + function emptyRollingQueue() { + debug("emptying the rolling queue"); + var toWrite; + while ((toWrite = that.writesWhileRolling.shift())) { + BaseRollingFileStream.super_.prototype.write.call(that, toWrite.data, toWrite.encoding); + that.currentSize += toWrite.data.length; + if (that.shouldRoll()) { + that.flush(); + return true; + } + } + that.flush(); + return false; + } + + this.rolling = true; + this.roll(this.filename, function() { + that.currentSize = 0; + that.rolling = emptyRollingQueue(); + if (that.rolling) { + process.nextTick(function() { that.initRolling(); }); + } + }); +}; + +BaseRollingFileStream.prototype.write = function(data, encoding) { + if (this.rolling) { + this.writesWhileRolling.push({ data: data, encoding: encoding }); + return false; + } else { + var canWrite = BaseRollingFileStream.super_.prototype.write.call(this, data, encoding); + this.currentSize += data.length; + debug('current size = ' + this.currentSize); + if (this.shouldRoll()) { + this.initRolling(); + } + return canWrite; + } +}; + +BaseRollingFileStream.prototype.shouldRoll = function() { + return false; // default behaviour is never to roll +}; + +BaseRollingFileStream.prototype.roll = function(filename, callback) { + callback(); // default behaviour is not to do anything +}; + + +function RollingFileStream (filename, size, backups, options) { + this.size = size; + this.backups = backups || 1; + function throwErrorIfArgumentsAreNotValid() { if (!filename || !size || size <= 0) { throw new Error("You must specify a filename and file size"); } } -} -util.inherits(RollingFileStream, fs.FileWriteStream); -RollingFileStream.prototype.write = function(data, encoding) { - if (this.rolling) { - this.writesWhileRolling.push({ data: data, encoding: encoding }); - return false; - } else { - var canWrite = RollingFileStream.super_.prototype.write.call(this, data, encoding); - console.log("bytesQueued: %d, max: %d", this.bytesQueued, this.size); - this.bytesQueued += data.length; - if (this.bytesQueued >= this.size) { - this.roll(); - } - return canWrite; - } -} + throwErrorIfArgumentsAreNotValid(); -RollingFileStream.prototype.roll = function () { + RollingFileStream.super_.call(this, filename, options); +} +util.inherits(RollingFileStream, BaseRollingFileStream); + +RollingFileStream.prototype.shouldRoll = function() { + return this.currentSize >= this.size; +}; + +RollingFileStream.prototype.roll = function(filename, callback) { var that = this, - nameMatcher = new RegExp('^' + path.basename(this.filename)); + nameMatcher = new RegExp('^' + path.basename(filename)); function justTheseFiles (item) { return nameMatcher.test(item); } - function index(filename) { - return parseInt(filename.substring((path.basename(that.filename) + '.').length), 10) || 0; + function index(filename_) { + return parseInt(filename_.substring((path.basename(filename) + '.').length), 10) || 0; } function byIndex(a, b) { @@ -148,14 +208,14 @@ RollingFileStream.prototype.roll = function () { function increaseFileIndex (fileToRename, cb) { var idx = index(fileToRename); - console.log("Index of %s is %d", fileToRename, idx); + debug('Index of ' + fileToRename + ' is ' + idx); if (idx < that.backups) { //on windows, you can get a EEXIST error if you rename a file to an existing file //so, we'll try to delete the file we're renaming to first - fs.unlink(that.filename + '.' + (idx+1), function (err) { + fs.unlink(filename + '.' + (idx+1), function (err) { //ignore err: if we could not delete, it's most likely that it doesn't exist - console.log("Renaming %s -> %s", fileToRename, that.filename + '.' + (idx+1)); - fs.rename(path.join(path.dirname(that.filename), fileToRename), that.filename + '.' + (idx + 1), cb); + debug('Renaming ' + fileToRename + ' -> ' + filename + '.' + (idx+1)); + fs.rename(path.join(path.dirname(filename), fileToRename), filename + '.' + (idx + 1), cb); }); } else { cb(); @@ -164,8 +224,8 @@ RollingFileStream.prototype.roll = function () { function renameTheFiles(cb) { //roll the backups (rename file.n to file.n+1, where n <= numBackups) - console.log("Renaming the old files"); - fs.readdir(path.dirname(that.filename), function (err, files) { + debug("Renaming the old files"); + fs.readdir(path.dirname(filename), function (err, files) { async.forEachSeries( files.filter(justTheseFiles).sort(byIndex).reverse(), increaseFileIndex, @@ -175,50 +235,34 @@ RollingFileStream.prototype.roll = function () { } function openANewFile(cb) { - console.log("Opening a new file"); + debug("Opening a new file"); fs.open( - that.filename, + filename, that.options.flags, that.options.mode, function (err, fd) { - console.log("opened new file"); + debug("opened new file"); var oldLogFileFD = that.fd; that.fd = fd; that.writable = true; - fs.close(oldLogFileFD, function() { - that.bytesQueued = 0; - that.bytesWritten = 0; - cb(); - }); + fs.close(oldLogFileFD, cb); } ); } - function emptyRollingQueue(cb) { - console.log("emptying the rolling queue"); - var toWrite; - while ((toWrite = that.writesWhileRolling.shift())) { - RollingFileStream.super_.prototype.write.call(that, toWrite.data, toWrite.encoding); - that.bytesQueued += toWrite.data.length; - } - that.rolling = false; - that.flush(); - cb(); - } - - console.log("Starting roll"); - console.log("Queueing up data until we've finished rolling"); - this.rolling = true; - console.log("Flushing underlying stream"); + debug("Starting roll"); + debug("Queueing up data until we've finished rolling"); + debug("Flushing underlying stream"); this.flush(); async.series([ renameTheFiles, - openANewFile, - emptyRollingQueue - ]); + openANewFile + ], callback); -} +}; + +exports.BaseRollingFileStream = BaseRollingFileStream; exports.RollingFileStream = RollingFileStream; exports.BufferedWriteStream = BufferedWriteStream; \ No newline at end of file