853 lines
20 KiB
JavaScript

"use strict";
require("../__util__/test-init");
var fs = require("fs");
var nodePath = require("path");
var chai = require("chai");
chai.config.includeStack = true;
var expect = chai.expect;
var fsReadOptions = { encoding: "utf8" };
var AsyncStream = require("marko/runtime/html/AsyncStream");
/* DEBUG INFO:
===========
Want pretty debug statements and diagrams?
Uncomment the following statement: */
// var asyncWriter = require('../debug');
function createAsyncStream(options) {
if (typeof options.write === "function") {
return new AsyncStream(null, options);
} else if (typeof options === "object") {
var name = options.name;
var globalData = options.global;
var out = new AsyncStream(globalData);
if (name) {
out.name = name;
}
return out;
} else {
return new AsyncStream();
}
}
describe("AsyncStream", function () {
beforeEach(function (done) {
// for (var k in require.cache) {
// if (require.cache.hasOwnProperty(k)) {
// delete require.cache[k];
// }
// }
done();
});
it("should render a series of sync calls correctly", function (done) {
var out = new AsyncStream();
out.write("1");
out.write("2");
out.write("3");
out.write("4");
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("1234");
done();
});
});
it("should resolve promise upon finish", function () {
var out = new AsyncStream();
out.write("1");
out.write("2");
return out.end().then((result) => {
const output = result.getOutput();
expect(output).to.equal("12");
expect(result.toString()).to.equal("12");
});
});
it("should render a series of sync and async calls correctly", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut1 = out.beginAsync();
setTimeout(function () {
asyncOut1.write("2");
asyncOut1.end();
}, 100);
out.write("3");
var asyncOut2 = out.beginAsync();
setTimeout(function () {
asyncOut2.write("4");
asyncOut2.end();
}, 10);
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("1234");
done();
});
});
it("should allow an async fragment to complete synchronously", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 10);
out.write("3");
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("123");
done();
});
});
it("should allow the async callback to provide data", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.end("2");
}, 10);
out.write("3");
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("123");
done();
});
});
it("should handle timeouts correctly", function (done) {
var out = new AsyncStream();
var errors = [];
out.on("error", function (e) {
errors.push(e);
});
out.write("1");
var asyncOut = out.beginAsync({ timeout: 100, name: "test" });
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 200);
out.write("3");
out.end();
out.on("finish", function (result) {
expect(errors.length).to.equal(1);
expect(result.getOutput()).to.equal("13");
done();
});
});
it("should render nested async calls correctly", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.write("2a");
var nestedAsyncContext = asyncOut.beginAsync();
setTimeout(function () {
nestedAsyncContext.write("2b");
nestedAsyncContext.end();
}, 10);
asyncOut.write("2c");
asyncOut.end();
}, 10);
out.write("3");
var asyncOut2 = out.beginAsync();
setTimeout(function () {
var nestedAsyncContext = asyncOut2.beginAsync();
setTimeout(function () {
nestedAsyncContext.write("4a");
nestedAsyncContext.end();
}, 10);
var nestedAsyncContext2 = asyncOut2.beginAsync();
setTimeout(function () {
nestedAsyncContext2.write("4b");
nestedAsyncContext2.end();
}, 10);
asyncOut2.write("4c");
asyncOut2.end();
}, 10);
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("12a2b2c34a4b4c");
done();
});
});
it("should handle odd execution ordering", function (done) {
var outA = createAsyncStream({ name: "outA" });
outA.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("1234567");
done();
});
outA.write("1");
var outB = outA.beginAsync({ name: "outB" });
outA.write("3");
var outC = outA.beginAsync({ name: "outC" });
var outD = outC.beginAsync({ name: "outD" });
outB.write("2");
outB.end();
outA.write("7");
outC.write("5");
outD.write("4");
outD.end();
outC.write("6");
outC.end();
outA.end();
});
it("should handle sync errors correctly", function (done) {
var out = new AsyncStream();
var errors = [];
out.on("error", function (e) {
errors.push(e);
});
out.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.error(new Error("test"));
}, 10);
out.write("3");
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(errors.length).to.equal(1);
expect(output).to.equal("13");
done();
});
});
it("should catch error in promise catch", function (done) {
const out = new AsyncStream();
let errors = [];
out.on("error", function (e) {
errors.push(e);
});
out.write("1");
let asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.error(new Error("test"));
}, 10);
out.write("3");
out.end().catch((err) => {
expect(errors.length).to.equal(1);
expect(err).to.be.an("error");
done();
});
});
it("should catch error in promise catch if `error` listener only set inside mixin", function (done) {
const out = new AsyncStream();
out
.catch((err) => {
expect(err).to.be.an("error");
expect(out.getOutput()).to.equal("1");
done();
})
.then(() => {
throw new Error("Should not get here!");
});
out.write("1");
out.error(new Error("test"));
out.write("2");
});
it("should support chaining", function (done) {
var errors = [];
var out = new AsyncStream()
.on("error", function (e) {
errors.push(e);
})
.on("finish", function (result) {
var output = result.getOutput();
expect(errors.length).to.equal(1);
expect(output).to.equal("13");
done();
})
.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.error(new Error("test"));
}, 10);
out.write("3").end();
});
it("should support writing to a through2 stream", function (done) {
var output = "";
var through2 = require("through2")(
function write(data, encoding, callback) {
output += data;
callback(null, data);
},
);
var errors = [];
var out = createAsyncStream(through2)
.on("error", function (e) {
errors.push(e);
})
.on("finish", function () {
expect(errors.length).to.equal(0);
expect(output).to.equal("123");
done();
})
.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 10);
out.write("3").end();
});
it("should support writing to a through stream", function (done) {
var output = "";
var through = require("through")(function write(data) {
output += data;
this.queue(data);
});
var errors = [];
var out = createAsyncStream(through)
.on("error", function (e) {
errors.push(e);
})
.on("end", function () {
expect(errors.length).to.equal(0);
expect(output).to.equal("123");
done();
})
.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 10);
out.write("3").end();
});
it("should support writing to a file output stream", function (done) {
var outFile = nodePath.join(__dirname, "test.out");
var out = fs.createWriteStream(outFile, fsReadOptions);
var errors = [];
out.on("close", function () {
var output = fs.readFileSync(outFile, fsReadOptions);
expect(errors.length).to.equal(0);
expect(output).to.equal("123");
fs.unlinkSync(outFile);
done();
});
out = createAsyncStream(out)
.on("error", function (e) {
errors.push(e);
})
.on("finish", function () {})
.write("1");
var asyncOut = out.beginAsync();
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 10);
out.write("3").end();
});
it("should support piping to an async writer", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut = out.beginAsync();
var helloReadStream = fs.createReadStream(
nodePath.join(__dirname, "fixtures/hello.txt"),
fsReadOptions,
);
helloReadStream.pipe(asyncOut);
out.write("2");
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("1Hello World2");
done();
});
});
it("should support a writer being piped to another stream", function (done) {
var through = require("through")();
var outStr = "";
var out = require("through")(function write(str) {
outStr += str;
});
through.pipe(out).on("error", function (e) {
done(e);
});
out = createAsyncStream(through);
out.write("1");
var asyncOut = out.beginAsync();
var helloReadStream = fs.createReadStream(
nodePath.join(__dirname, "fixtures/hello.txt"),
fsReadOptions,
);
helloReadStream.pipe(asyncOut);
out.write("2");
out.on("end", function () {
expect(outStr).to.equal("1Hello World2");
done();
});
out.end();
});
it("should allow an async fragment to flush last", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut = out.beginAsync({ last: true });
out.once("last", function () {
asyncOut.write("2");
asyncOut.end();
});
out.write("3");
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("123");
done();
});
});
it("should allow an async fragment to flush last asynchronously", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut = out.beginAsync({ last: true });
var lastFiredCount = 0;
out.on("last", function () {
lastFiredCount++;
});
out.once("last", function () {
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 10);
});
out.write("3");
out.end();
out.on("finish", function (result) {
expect(lastFiredCount).to.equal(1);
var output = result.getOutput();
expect(output).to.equal("123");
done();
});
});
it("should not crash the program if the underlying stream has an error listener", function (done) {
var stream = require("stream");
var PassThrough = stream.PassThrough;
var passthrough = new PassThrough();
passthrough.on("error", function () {
done();
});
var out = createAsyncStream(passthrough);
out.write("hello");
out.error("test");
});
it("should crash the program if the underlying stream does *not* have an error listener", function (done) {
var stream = require("stream");
var PassThrough = stream.PassThrough;
var passthrough = new PassThrough();
var out = createAsyncStream(passthrough);
out.write("hello");
try {
out.error("test");
done("uncaught exception expected");
} catch (e) {
done();
}
});
it("should allow multiple onLast calls", function (done) {
var out = new AsyncStream();
out.write("1");
var asyncOut1 = out.beginAsync();
setTimeout(function () {
asyncOut1.write("2");
asyncOut1.end();
}, 20);
var asyncOut2 = out.beginAsync({ last: true });
var onLastCount = 0;
var lastOutput = [];
out.onLast(function (next) {
onLastCount++;
lastOutput.push("a");
asyncOut2.write("3");
asyncOut2.end();
setTimeout(next, 10);
});
var asyncOut3 = out.beginAsync({ last: true });
out.onLast(function (next) {
onLastCount++;
lastOutput.push("b");
asyncOut3.write("4");
asyncOut3.end();
next();
});
out.write("5");
out.end();
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("12345");
expect(onLastCount).to.equal(2);
expect(lastOutput).to.deep.equal(["a", "b"]);
done();
});
});
it("should handle timeout errors correctly", function (done) {
var output = "";
var errors = [];
var through = require("through")(function write(data) {
output += data;
})
.on("error", function (err) {
errors.push(err);
})
.on("end", function () {
expect(output).to.equal("12");
expect(errors.length).to.equal(1);
expect(errors[0].toString()).to.contain("timed out");
done();
});
var out = createAsyncStream(through);
out.write("1");
var asyncOut1 = out.beginAsync();
out.beginAsync({ timeout: 50 });
setTimeout(function () {
asyncOut1.write("2");
asyncOut1.end();
}, 100);
out.end();
});
it("should avoid writes after end (a)", function (done) {
var output = "";
var errors = [];
var ended = false;
var through = require("through")(function write(data) {
expect(ended).to.equal(false);
output += data;
})
.on("error", function (err) {
errors.push(err);
})
.on("end", function () {
ended = true;
expect(output).to.equal("13");
done();
});
var out = createAsyncStream(through);
out.write("1");
var asyncOut1 = out.beginAsync({ timeout: 50 });
setTimeout(function () {
asyncOut1.write("2");
asyncOut1.end();
}, 100);
var asyncOut2 = out.beginAsync();
setTimeout(function () {
asyncOut2.write("3");
asyncOut2.end();
}, 50);
out.end();
});
it("should avoid writes after end (b)", function (done) {
var output = "";
var errors = [];
var ended = false;
var through = require("through")(function write(data) {
expect(ended).to.equal(false);
output += data;
})
.on("error", function (err) {
errors.push(err);
})
.on("end", function () {
ended = true;
expect(output).to.equal("12");
done();
});
var out = createAsyncStream(through);
out.write("1");
var asyncOut1 = out.beginAsync();
setTimeout(function () {
asyncOut1.write("2");
asyncOut1.end();
}, 75);
var asyncOut2 = out.beginAsync({ timeout: 50 });
// This async fragment will timeout but we will
// still write to it after it ends
setTimeout(function () {
asyncOut2.write("3");
asyncOut2.end();
}, 100);
out.end();
});
it("should avoid writes after end (c)", function (done) {
var output = "";
var errors = [];
var ended = false;
var through = require("through")(function write(data) {
expect(ended).to.equal(false);
output += data;
})
.on("error", function (err) {
errors.push(err);
})
.on("end", function () {
ended = true;
expect(output).to.equal("12");
expect(errors.length).to.equal(1);
done();
});
var out = createAsyncStream(through);
out.write("1");
var asyncOut1 = out.beginAsync();
setTimeout(function () {
asyncOut1.write("2");
asyncOut1.end();
}, 75);
var asyncOut2 = out.beginAsync();
// This async fragment will timeout but we will
// still write to it after it ends
setTimeout(function () {
asyncOut2.error("TEST ERROR");
asyncOut2.end();
}, 100);
out.end();
});
it("should track finished correctly", function (done) {
var myGlobal = {};
var out1 = createAsyncStream({ global: myGlobal });
var out2 = createAsyncStream({ global: myGlobal });
function handleFinished() {
if (out1.data.__finishedFlag && out2.data.__finishedFlag) {
done();
}
}
out1.on("finish", function () {
if (out1.data.__finishedFlag) {
return done(new Error("finished invoked multiple times!"));
}
out1.data.__finishedFlag = true;
handleFinished();
});
out2.on("finish", function () {
if (out2.data.__finishedFlag) {
return done(new Error("finished invoked multiple times!"));
}
out2.data.__finishedFlag = true;
handleFinished();
});
out1.write("foo");
out1.end();
out1.end();
out2.write("bar");
out2.end();
});
it("should end correctly if top-level is ended asynchronously", function (done) {
var out = new AsyncStream();
out.name = "outer";
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("123");
done();
});
out.write("1");
setTimeout(function () {
var asyncOut = out.beginAsync({ name: "inner" });
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 50);
out.write("3");
out.end();
}, 10);
});
it("should end correctly if top-level is ended asynchronously when providing custom globals", function (done) {
var out = createAsyncStream({ global: { foo: "bar" } });
out.name = "outer";
out.on("finish", function (result) {
var output = result.getOutput();
expect(output).to.equal("123");
done();
});
out.write("1");
setTimeout(function () {
var asyncOut = out.beginAsync({ name: "inner" });
setTimeout(function () {
asyncOut.write("2");
asyncOut.end();
}, 50);
out.write("3");
out.end();
}, 10);
});
it("should support out.stream for accessing the original stream", function (done) {
var through = require("through");
var stream = through(function write() {});
var out = createAsyncStream(stream);
expect(out.stream).to.equal(stream);
var asyncOut1 = out.beginAsync();
expect(asyncOut1.stream).to.equal(stream);
setTimeout(function () {
expect(asyncOut1.stream).to.equal(stream);
asyncOut1.end();
}, 100);
var asyncOut2 = out.beginAsync();
expect(asyncOut2.stream).to.equal(stream);
setTimeout(function () {
expect(asyncOut2.stream).to.equal(stream);
asyncOut2.end();
}, 50);
out.on("end", function () {
expect(out.stream).to.equal(stream);
expect(asyncOut1.stream).to.equal(stream);
expect(asyncOut2.stream).to.equal(stream);
done();
});
out.end();
});
});