"use strict"; require("../__util__/test-init"); var fs = require("fs"); var nodePath = require("path"); var chai = require("chai"); chai.config.includeStack = true; var expect = chai.expect; var fsReadOptions = { encoding: "utf8" }; var AsyncStream = require("marko/runtime/html/AsyncStream"); /* DEBUG INFO: =========== Want pretty debug statements and diagrams? Uncomment the following statement: */ // var asyncWriter = require('../debug'); function createAsyncStream(options) { if (typeof options.write === "function") { return new AsyncStream(null, options); } else if (typeof options === "object") { var name = options.name; var globalData = options.global; var out = new AsyncStream(globalData); if (name) { out.name = name; } return out; } else { return new AsyncStream(); } } describe("AsyncStream", function () { beforeEach(function (done) { // for (var k in require.cache) { // if (require.cache.hasOwnProperty(k)) { // delete require.cache[k]; // } // } done(); }); it("should render a series of sync calls correctly", function (done) { var out = new AsyncStream(); out.write("1"); out.write("2"); out.write("3"); out.write("4"); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("1234"); done(); }); }); it("should resolve promise upon finish", function () { var out = new AsyncStream(); out.write("1"); out.write("2"); return out.end().then((result) => { const output = result.getOutput(); expect(output).to.equal("12"); expect(result.toString()).to.equal("12"); }); }); it("should render a series of sync and async calls correctly", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut1 = out.beginAsync(); setTimeout(function () { asyncOut1.write("2"); asyncOut1.end(); }, 100); out.write("3"); var asyncOut2 = out.beginAsync(); setTimeout(function () { asyncOut2.write("4"); asyncOut2.end(); }, 10); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("1234"); done(); }); }); it("should allow an async fragment to complete synchronously", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 10); out.write("3"); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("123"); done(); }); }); it("should allow the async callback to provide data", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.end("2"); }, 10); out.write("3"); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("123"); done(); }); }); it("should handle timeouts correctly", function (done) { var out = new AsyncStream(); var errors = []; out.on("error", function (e) { errors.push(e); }); out.write("1"); var asyncOut = out.beginAsync({ timeout: 100, name: "test" }); setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 200); out.write("3"); out.end(); out.on("finish", function (result) { expect(errors.length).to.equal(1); expect(result.getOutput()).to.equal("13"); done(); }); }); it("should render nested async calls correctly", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.write("2a"); var nestedAsyncContext = asyncOut.beginAsync(); setTimeout(function () { nestedAsyncContext.write("2b"); nestedAsyncContext.end(); }, 10); asyncOut.write("2c"); asyncOut.end(); }, 10); out.write("3"); var asyncOut2 = out.beginAsync(); setTimeout(function () { var nestedAsyncContext = asyncOut2.beginAsync(); setTimeout(function () { nestedAsyncContext.write("4a"); nestedAsyncContext.end(); }, 10); var nestedAsyncContext2 = asyncOut2.beginAsync(); setTimeout(function () { nestedAsyncContext2.write("4b"); nestedAsyncContext2.end(); }, 10); asyncOut2.write("4c"); asyncOut2.end(); }, 10); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("12a2b2c34a4b4c"); done(); }); }); it("should handle odd execution ordering", function (done) { var outA = createAsyncStream({ name: "outA" }); outA.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("1234567"); done(); }); outA.write("1"); var outB = outA.beginAsync({ name: "outB" }); outA.write("3"); var outC = outA.beginAsync({ name: "outC" }); var outD = outC.beginAsync({ name: "outD" }); outB.write("2"); outB.end(); outA.write("7"); outC.write("5"); outD.write("4"); outD.end(); outC.write("6"); outC.end(); outA.end(); }); it("should handle sync errors correctly", function (done) { var out = new AsyncStream(); var errors = []; out.on("error", function (e) { errors.push(e); }); out.write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.error(new Error("test")); }, 10); out.write("3"); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(errors.length).to.equal(1); expect(output).to.equal("13"); done(); }); }); it("should catch error in promise catch", function (done) { const out = new AsyncStream(); let errors = []; out.on("error", function (e) { errors.push(e); }); out.write("1"); let asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.error(new Error("test")); }, 10); out.write("3"); out.end().catch((err) => { expect(errors.length).to.equal(1); expect(err).to.be.an("error"); done(); }); }); it("should catch error in promise catch if `error` listener only set inside mixin", function (done) { const out = new AsyncStream(); out .catch((err) => { expect(err).to.be.an("error"); expect(out.getOutput()).to.equal("1"); done(); }) .then(() => { throw new Error("Should not get here!"); }); out.write("1"); out.error(new Error("test")); out.write("2"); }); it("should support chaining", function (done) { var errors = []; var out = new AsyncStream() .on("error", function (e) { errors.push(e); }) .on("finish", function (result) { var output = result.getOutput(); expect(errors.length).to.equal(1); expect(output).to.equal("13"); done(); }) .write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.error(new Error("test")); }, 10); out.write("3").end(); }); it("should support writing to a through2 stream", function (done) { var output = ""; var through2 = require("through2")( function write(data, encoding, callback) { output += data; callback(null, data); }, ); var errors = []; var out = createAsyncStream(through2) .on("error", function (e) { errors.push(e); }) .on("finish", function () { expect(errors.length).to.equal(0); expect(output).to.equal("123"); done(); }) .write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 10); out.write("3").end(); }); it("should support writing to a through stream", function (done) { var output = ""; var through = require("through")(function write(data) { output += data; this.queue(data); }); var errors = []; var out = createAsyncStream(through) .on("error", function (e) { errors.push(e); }) .on("end", function () { expect(errors.length).to.equal(0); expect(output).to.equal("123"); done(); }) .write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 10); out.write("3").end(); }); it("should support writing to a file output stream", function (done) { var outFile = nodePath.join(__dirname, "test.out"); var out = fs.createWriteStream(outFile, fsReadOptions); var errors = []; out.on("close", function () { var output = fs.readFileSync(outFile, fsReadOptions); expect(errors.length).to.equal(0); expect(output).to.equal("123"); fs.unlinkSync(outFile); done(); }); out = createAsyncStream(out) .on("error", function (e) { errors.push(e); }) .on("finish", function () {}) .write("1"); var asyncOut = out.beginAsync(); setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 10); out.write("3").end(); }); it("should support piping to an async writer", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut = out.beginAsync(); var helloReadStream = fs.createReadStream( nodePath.join(__dirname, "fixtures/hello.txt"), fsReadOptions, ); helloReadStream.pipe(asyncOut); out.write("2"); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("1Hello World2"); done(); }); }); it("should support a writer being piped to another stream", function (done) { var through = require("through")(); var outStr = ""; var out = require("through")(function write(str) { outStr += str; }); through.pipe(out).on("error", function (e) { done(e); }); out = createAsyncStream(through); out.write("1"); var asyncOut = out.beginAsync(); var helloReadStream = fs.createReadStream( nodePath.join(__dirname, "fixtures/hello.txt"), fsReadOptions, ); helloReadStream.pipe(asyncOut); out.write("2"); out.on("end", function () { expect(outStr).to.equal("1Hello World2"); done(); }); out.end(); }); it("should allow an async fragment to flush last", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut = out.beginAsync({ last: true }); out.once("last", function () { asyncOut.write("2"); asyncOut.end(); }); out.write("3"); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("123"); done(); }); }); it("should allow an async fragment to flush last asynchronously", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut = out.beginAsync({ last: true }); var lastFiredCount = 0; out.on("last", function () { lastFiredCount++; }); out.once("last", function () { setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 10); }); out.write("3"); out.end(); out.on("finish", function (result) { expect(lastFiredCount).to.equal(1); var output = result.getOutput(); expect(output).to.equal("123"); done(); }); }); it("should not crash the program if the underlying stream has an error listener", function (done) { var stream = require("stream"); var PassThrough = stream.PassThrough; var passthrough = new PassThrough(); passthrough.on("error", function () { done(); }); var out = createAsyncStream(passthrough); out.write("hello"); out.error("test"); }); it("should crash the program if the underlying stream does *not* have an error listener", function (done) { var stream = require("stream"); var PassThrough = stream.PassThrough; var passthrough = new PassThrough(); var out = createAsyncStream(passthrough); out.write("hello"); try { out.error("test"); done("uncaught exception expected"); } catch (e) { done(); } }); it("should allow multiple onLast calls", function (done) { var out = new AsyncStream(); out.write("1"); var asyncOut1 = out.beginAsync(); setTimeout(function () { asyncOut1.write("2"); asyncOut1.end(); }, 20); var asyncOut2 = out.beginAsync({ last: true }); var onLastCount = 0; var lastOutput = []; out.onLast(function (next) { onLastCount++; lastOutput.push("a"); asyncOut2.write("3"); asyncOut2.end(); setTimeout(next, 10); }); var asyncOut3 = out.beginAsync({ last: true }); out.onLast(function (next) { onLastCount++; lastOutput.push("b"); asyncOut3.write("4"); asyncOut3.end(); next(); }); out.write("5"); out.end(); out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("12345"); expect(onLastCount).to.equal(2); expect(lastOutput).to.deep.equal(["a", "b"]); done(); }); }); it("should handle timeout errors correctly", function (done) { var output = ""; var errors = []; var through = require("through")(function write(data) { output += data; }) .on("error", function (err) { errors.push(err); }) .on("end", function () { expect(output).to.equal("12"); expect(errors.length).to.equal(1); expect(errors[0].toString()).to.contain("timed out"); done(); }); var out = createAsyncStream(through); out.write("1"); var asyncOut1 = out.beginAsync(); out.beginAsync({ timeout: 50 }); setTimeout(function () { asyncOut1.write("2"); asyncOut1.end(); }, 100); out.end(); }); it("should avoid writes after end (a)", function (done) { var output = ""; var errors = []; var ended = false; var through = require("through")(function write(data) { expect(ended).to.equal(false); output += data; }) .on("error", function (err) { errors.push(err); }) .on("end", function () { ended = true; expect(output).to.equal("13"); done(); }); var out = createAsyncStream(through); out.write("1"); var asyncOut1 = out.beginAsync({ timeout: 50 }); setTimeout(function () { asyncOut1.write("2"); asyncOut1.end(); }, 100); var asyncOut2 = out.beginAsync(); setTimeout(function () { asyncOut2.write("3"); asyncOut2.end(); }, 50); out.end(); }); it("should avoid writes after end (b)", function (done) { var output = ""; var errors = []; var ended = false; var through = require("through")(function write(data) { expect(ended).to.equal(false); output += data; }) .on("error", function (err) { errors.push(err); }) .on("end", function () { ended = true; expect(output).to.equal("12"); done(); }); var out = createAsyncStream(through); out.write("1"); var asyncOut1 = out.beginAsync(); setTimeout(function () { asyncOut1.write("2"); asyncOut1.end(); }, 75); var asyncOut2 = out.beginAsync({ timeout: 50 }); // This async fragment will timeout but we will // still write to it after it ends setTimeout(function () { asyncOut2.write("3"); asyncOut2.end(); }, 100); out.end(); }); it("should avoid writes after end (c)", function (done) { var output = ""; var errors = []; var ended = false; var through = require("through")(function write(data) { expect(ended).to.equal(false); output += data; }) .on("error", function (err) { errors.push(err); }) .on("end", function () { ended = true; expect(output).to.equal("12"); expect(errors.length).to.equal(1); done(); }); var out = createAsyncStream(through); out.write("1"); var asyncOut1 = out.beginAsync(); setTimeout(function () { asyncOut1.write("2"); asyncOut1.end(); }, 75); var asyncOut2 = out.beginAsync(); // This async fragment will timeout but we will // still write to it after it ends setTimeout(function () { asyncOut2.error("TEST ERROR"); asyncOut2.end(); }, 100); out.end(); }); it("should track finished correctly", function (done) { var myGlobal = {}; var out1 = createAsyncStream({ global: myGlobal }); var out2 = createAsyncStream({ global: myGlobal }); function handleFinished() { if (out1.data.__finishedFlag && out2.data.__finishedFlag) { done(); } } out1.on("finish", function () { if (out1.data.__finishedFlag) { return done(new Error("finished invoked multiple times!")); } out1.data.__finishedFlag = true; handleFinished(); }); out2.on("finish", function () { if (out2.data.__finishedFlag) { return done(new Error("finished invoked multiple times!")); } out2.data.__finishedFlag = true; handleFinished(); }); out1.write("foo"); out1.end(); out1.end(); out2.write("bar"); out2.end(); }); it("should end correctly if top-level is ended asynchronously", function (done) { var out = new AsyncStream(); out.name = "outer"; out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("123"); done(); }); out.write("1"); setTimeout(function () { var asyncOut = out.beginAsync({ name: "inner" }); setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 50); out.write("3"); out.end(); }, 10); }); it("should end correctly if top-level is ended asynchronously when providing custom globals", function (done) { var out = createAsyncStream({ global: { foo: "bar" } }); out.name = "outer"; out.on("finish", function (result) { var output = result.getOutput(); expect(output).to.equal("123"); done(); }); out.write("1"); setTimeout(function () { var asyncOut = out.beginAsync({ name: "inner" }); setTimeout(function () { asyncOut.write("2"); asyncOut.end(); }, 50); out.write("3"); out.end(); }, 10); }); it("should support out.stream for accessing the original stream", function (done) { var through = require("through"); var stream = through(function write() {}); var out = createAsyncStream(stream); expect(out.stream).to.equal(stream); var asyncOut1 = out.beginAsync(); expect(asyncOut1.stream).to.equal(stream); setTimeout(function () { expect(asyncOut1.stream).to.equal(stream); asyncOut1.end(); }, 100); var asyncOut2 = out.beginAsync(); expect(asyncOut2.stream).to.equal(stream); setTimeout(function () { expect(asyncOut2.stream).to.equal(stream); asyncOut2.end(); }, 50); out.on("end", function () { expect(out.stream).to.equal(stream); expect(asyncOut1.stream).to.equal(stream); expect(asyncOut2.stream).to.equal(stream); done(); }); out.end(); }); });