diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 04124f8a..76906712 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -566,7 +566,7 @@ Client.prototype.end = function (cb) { this._ending = true // if we have never connected, then end is a noop, callback immediately - if (this.connection.stream.readyState === 'closed') { + if (!this.connection._connecting) { if (cb) { cb() } else { diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js index acc5c0e8..6344b417 100644 --- a/packages/pg/lib/connection-fast.js +++ b/packages/pg/lib/connection-fast.js @@ -42,13 +42,10 @@ util.inherits(Connection, EventEmitter) Connection.prototype.connect = function (port, host) { var self = this - if (this.stream.readyState === 'closed') { - this.stream.connect(port, host) - } else if (this.stream.readyState === 'open') { - this.emit('connect') - } + this._connecting = true + this.stream.connect(port, host) - this.stream.on('connect', function () { + this.stream.once('connect', function () { if (self._keepAlive) { self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) } @@ -187,7 +184,7 @@ const endBuffer = serialize.end() Connection.prototype.end = function () { // 0x58 = 'X' this._ending = true - if (!this.stream.writable) { + if (!this._connecting || !this.stream.writable) { this.stream.end() return } diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 243872c9..c3f30aa0 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -50,13 +50,10 @@ util.inherits(Connection, EventEmitter) Connection.prototype.connect = function (port, host) { var self = this - if (this.stream.readyState === 'closed') { - this.stream.connect(port, host) - } else if (this.stream.readyState === 'open') { - this.emit('connect') - } + this._connecting = true + this.stream.connect(port, host) - this.stream.on('connect', function () { + this.stream.once('connect', function () { if (self._keepAlive) { self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) } @@ -316,7 +313,7 @@ Connection.prototype.end = function () { // 0x58 = 'X' this.writer.add(emptyBuffer) this._ending = true - if (!this.stream.writable) { + if (!this._connecting || !this.stream.writable) { this.stream.end() return } diff --git a/packages/pg/test/unit/client/stream-and-query-error-interaction-tests.js b/packages/pg/test/unit/client/stream-and-query-error-interaction-tests.js index 9b0a3560..041af010 100644 --- a/packages/pg/test/unit/client/stream-and-query-error-interaction-tests.js +++ b/packages/pg/test/unit/client/stream-and-query-error-interaction-tests.js @@ -5,6 +5,9 @@ var Client = require(__dirname + '/../../../lib/client') test('emits end when not in query', function () { var stream = new (require('events').EventEmitter)() + stream.connect = function () { + // NOOP + } stream.write = function () { // NOOP } diff --git a/packages/pg/test/unit/connection/inbound-parser-tests.js b/packages/pg/test/unit/connection/inbound-parser-tests.js index 5f92cdc5..f3690cc6 100644 --- a/packages/pg/test/unit/connection/inbound-parser-tests.js +++ b/packages/pg/test/unit/connection/inbound-parser-tests.js @@ -399,7 +399,6 @@ test('Connection', function () { test('split buffer, single message parsing', function () { var fullBuffer = buffers.dataRow([null, 'bang', 'zug zug', null, '!']) var stream = new MemoryStream() - stream.readyState = 'open' var client = new Connection({ stream: stream, }) diff --git a/packages/pg/test/unit/connection/outbound-sending-tests.js b/packages/pg/test/unit/connection/outbound-sending-tests.js index b40af000..8b21de4c 100644 --- a/packages/pg/test/unit/connection/outbound-sending-tests.js +++ b/packages/pg/test/unit/connection/outbound-sending-tests.js @@ -5,6 +5,7 @@ var stream = new MemoryStream() var con = new Connection({ stream: stream, }) +con._connecting = true assert.received = function (stream, buffer) { assert.lengthIs(stream.packets, 1) diff --git a/packages/pg/test/unit/connection/startup-tests.js b/packages/pg/test/unit/connection/startup-tests.js index 09a710c7..6e317d70 100644 --- a/packages/pg/test/unit/connection/startup-tests.js +++ b/packages/pg/test/unit/connection/startup-tests.js @@ -7,10 +7,9 @@ test('connection can take existing stream', function () { assert.equal(con.stream, stream) }) -test('using closed stream', function () { +test('using any stream', function () { var makeStream = function () { var stream = new MemoryStream() - stream.readyState = 'closed' stream.connect = function (port, host) { this.connectCalled = true this.port = port @@ -65,20 +64,3 @@ test('using closed stream', function () { }) }) }) - -test('using opened stream', function () { - var stream = new MemoryStream() - stream.readyState = 'open' - stream.connect = function () { - assert.ok(false, 'Should not call open') - } - var con = new Connection({ stream: stream }) - test('does not call open', function () { - var hit = false - con.once('connect', function () { - hit = true - }) - con.connect() - assert.ok(hit) - }) -}) diff --git a/packages/pg/test/unit/test-helper.js b/packages/pg/test/unit/test-helper.js index 5793251b..918b1418 100644 --- a/packages/pg/test/unit/test-helper.js +++ b/packages/pg/test/unit/test-helper.js @@ -13,6 +13,10 @@ helper.sys.inherits(MemoryStream, EventEmitter) var p = MemoryStream.prototype +p.connect = function () { + // NOOP +} + p.write = function (packet, cb) { this.packets.push(packet) if (cb) { @@ -30,7 +34,6 @@ p.writable = true const createClient = function () { var stream = new MemoryStream() - stream.readyState = 'open' var client = new Client({ connection: new Connection({ stream: stream }), })