mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
feat(): pg-query-stream typescript (#2376)
* feat(): start converting pg-query stream * feat(): solution project, initial version of typescript-pg-query stream * chore(): mocha with typescript * fix(): eslint ignore query stream dist * refactor(pg-query-stream): convert test to ts * chore(): fixed type errors * chore(): fix helper usage * chore(): use ts-node compatibile with node v8 * fix(): addd es extension * chore(): remove emitClose and added compilation for async iterators * chore(): condition for asyc iteration test * chore(): rename class to match ts-defs * chore(): tests to import from src instead of dist * chore(): remove prettier from peer deps: * chore(): update lock file
This commit is contained in:
parent
52dfca493c
commit
78a14a164d
@ -2,7 +2,7 @@
|
||||
"plugins": ["prettier"],
|
||||
"parser": "@typescript-eslint/parser",
|
||||
"extends": ["plugin:prettier/recommended", "prettier/@typescript-eslint"],
|
||||
"ignorePatterns": ["node_modules", "coverage", "packages/pg-protocol/dist/**/*"],
|
||||
"ignorePatterns": ["node_modules", "coverage", "packages/pg-protocol/dist/**/*", "packages/pg-query-stream/dist/**/*"],
|
||||
"parserOptions": {
|
||||
"ecmaVersion": 2017,
|
||||
"sourceType": "module"
|
||||
|
||||
@ -11,7 +11,8 @@
|
||||
],
|
||||
"scripts": {
|
||||
"test": "yarn lerna exec yarn test",
|
||||
"build": "yarn lerna exec --scope pg-protocol yarn build",
|
||||
"build": "tsc --build",
|
||||
"build:watch": "tsc --build --watch",
|
||||
"pretest": "yarn build",
|
||||
"lint": "eslint '*/**/*.{js,ts,tsx}'"
|
||||
},
|
||||
@ -23,7 +24,8 @@
|
||||
"eslint-plugin-node": "^11.1.0",
|
||||
"eslint-plugin-prettier": "^3.1.4",
|
||||
"lerna": "^3.19.0",
|
||||
"prettier": "2.1.2"
|
||||
"prettier": "2.1.2",
|
||||
"typescript": "^4.0.3"
|
||||
},
|
||||
"prettier": {
|
||||
"semi": false,
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
"chunky": "^0.0.0",
|
||||
"mocha": "^7.1.2",
|
||||
"ts-node": "^8.5.4",
|
||||
"typescript": "^3.7.3"
|
||||
"typescript": "^4.0.3"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "mocha dist/**/*.test.js",
|
||||
@ -21,5 +21,9 @@
|
||||
"build:watch": "tsc --watch",
|
||||
"prepublish": "yarn build",
|
||||
"pretest": "yarn build"
|
||||
}
|
||||
},
|
||||
"files": [
|
||||
"/dist/*{js,ts,map}",
|
||||
"/src"
|
||||
]
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
"moduleResolution": "node",
|
||||
"sourceMap": true,
|
||||
"outDir": "dist",
|
||||
"incremental": true,
|
||||
"baseUrl": ".",
|
||||
"declaration": true,
|
||||
"paths": {
|
||||
|
||||
@ -2,9 +2,10 @@
|
||||
"name": "pg-query-stream",
|
||||
"version": "3.3.2",
|
||||
"description": "Postgres query result returned as readable stream",
|
||||
"main": "index.js",
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts",
|
||||
"scripts": {
|
||||
"test": "mocha"
|
||||
"test": "mocha -r ts-node/register test/**/*.ts"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
@ -16,12 +17,20 @@
|
||||
"query",
|
||||
"stream"
|
||||
],
|
||||
"files": [
|
||||
"/dist/*{js,ts,map}",
|
||||
"/src"
|
||||
],
|
||||
"author": "Brian M. Carlson",
|
||||
"license": "MIT",
|
||||
"bugs": {
|
||||
"url": "https://github.com/brianc/node-postgres/issues"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^14.0.0",
|
||||
"@types/pg": "^7.14.5",
|
||||
"@types/chai": "^4.2.13",
|
||||
"@types/mocha": "^8.0.3",
|
||||
"JSONStream": "~0.7.1",
|
||||
"concat-stream": "~1.0.1",
|
||||
"eslint-plugin-promise": "^3.5.0",
|
||||
@ -29,7 +38,9 @@
|
||||
"pg": "^8.4.2",
|
||||
"stream-spec": "~0.3.5",
|
||||
"stream-tester": "0.0.5",
|
||||
"through": "~2.3.4"
|
||||
"through": "~2.3.4",
|
||||
"ts-node": "^8.5.4",
|
||||
"typescript": "^4.0.3"
|
||||
},
|
||||
"dependencies": {
|
||||
"pg-cursor": "^2.4.2"
|
||||
|
||||
@ -1,11 +1,30 @@
|
||||
const { Readable } = require('stream')
|
||||
const Cursor = require('pg-cursor')
|
||||
import { Readable } from 'stream'
|
||||
import { Submittable, Connection } from 'pg'
|
||||
import Cursor from 'pg-cursor'
|
||||
|
||||
class PgQueryStream extends Readable {
|
||||
constructor(text, values, config = {}) {
|
||||
interface QueryStreamConfig {
|
||||
batchSize?: number
|
||||
highWaterMark?: number
|
||||
rowMode?: 'array'
|
||||
types?: any
|
||||
}
|
||||
|
||||
class QueryStream extends Readable implements Submittable {
|
||||
cursor: any
|
||||
_result: any
|
||||
|
||||
handleRowDescription: Function
|
||||
handleDataRow: Function
|
||||
handlePortalSuspended: Function
|
||||
handleCommandComplete: Function
|
||||
handleReadyForQuery: Function
|
||||
handleError: Function
|
||||
handleEmptyQuery: Function
|
||||
|
||||
public constructor(text: string, values?: any[], config: QueryStreamConfig = {}) {
|
||||
const { batchSize, highWaterMark = 100 } = config
|
||||
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
|
||||
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark })
|
||||
|
||||
super({ objectMode: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark })
|
||||
this.cursor = new Cursor(text, values, config)
|
||||
|
||||
// delegate Submittable callbacks to cursor
|
||||
@ -21,19 +40,19 @@ class PgQueryStream extends Readable {
|
||||
this._result = this.cursor._result
|
||||
}
|
||||
|
||||
submit(connection) {
|
||||
public submit(connection: Connection): void {
|
||||
this.cursor.submit(connection)
|
||||
}
|
||||
|
||||
_destroy(_err, cb) {
|
||||
this.cursor.close((err) => {
|
||||
public _destroy(_err: Error, cb: Function) {
|
||||
this.cursor.close((err?: Error) => {
|
||||
cb(err || _err)
|
||||
})
|
||||
}
|
||||
|
||||
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
|
||||
_read(size) {
|
||||
this.cursor.read(size, (err, rows, result) => {
|
||||
public _read(size: number) {
|
||||
this.cursor.read(size, (err: Error, rows: any[]) => {
|
||||
if (err) {
|
||||
// https://nodejs.org/api/stream.html#stream_errors_while_reading
|
||||
this.destroy(err)
|
||||
@ -45,4 +64,4 @@ class PgQueryStream extends Readable {
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = PgQueryStream
|
||||
export = QueryStream
|
||||
@ -1,112 +0,0 @@
|
||||
const QueryStream = require('../')
|
||||
const pg = require('pg')
|
||||
const assert = require('assert')
|
||||
|
||||
const queryText = 'SELECT * FROM generate_series(0, 200) num'
|
||||
describe('Async iterator', () => {
|
||||
it('works', async () => {
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
const query = client.query(stream)
|
||||
const rows = []
|
||||
for await (const row of query) {
|
||||
rows.push(row)
|
||||
}
|
||||
assert.equal(rows.length, 201)
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can async iterate and then do a query afterwards', async () => {
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
const query = client.query(stream)
|
||||
const iteratorRows = []
|
||||
for await (const row of query) {
|
||||
iteratorRows.push(row)
|
||||
}
|
||||
assert.equal(iteratorRows.length, 201)
|
||||
const { rows } = await client.query('SELECT NOW()')
|
||||
assert.equal(rows.length, 1)
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can async iterate multiple times with a pool', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
|
||||
const allRows = []
|
||||
const run = async () => {
|
||||
// get the client
|
||||
const client = await pool.connect()
|
||||
// stream some rows
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const iteratorRows = []
|
||||
client.query(stream)
|
||||
for await (const row of stream) {
|
||||
iteratorRows.push(row)
|
||||
allRows.push(row)
|
||||
}
|
||||
assert.equal(iteratorRows.length, 201)
|
||||
client.release()
|
||||
}
|
||||
await Promise.all([run(), run(), run()])
|
||||
assert.equal(allRows.length, 603)
|
||||
await pool.end()
|
||||
})
|
||||
|
||||
it('can break out of iteration early', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
const client = await pool.connect()
|
||||
const rows = []
|
||||
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
|
||||
rows.push(row)
|
||||
break;
|
||||
}
|
||||
for await (const row of client.query(new QueryStream(queryText, []))) {
|
||||
rows.push(row)
|
||||
break;
|
||||
}
|
||||
for await (const row of client.query(new QueryStream(queryText, []))) {
|
||||
rows.push(row)
|
||||
break;
|
||||
}
|
||||
assert.strictEqual(rows.length, 3)
|
||||
client.release()
|
||||
await pool.end()
|
||||
})
|
||||
|
||||
it('only returns rows on first iteration', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
const client = await pool.connect()
|
||||
const rows = []
|
||||
const stream = client.query(new QueryStream(queryText, []))
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
break;
|
||||
}
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
}
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
}
|
||||
assert.strictEqual(rows.length, 1)
|
||||
client.release()
|
||||
await pool.end()
|
||||
})
|
||||
|
||||
it('can read with delays', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
const client = await pool.connect()
|
||||
const rows = []
|
||||
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
await new Promise((resolve) => setTimeout(resolve, 1))
|
||||
}
|
||||
assert.strictEqual(rows.length, 201)
|
||||
client.release()
|
||||
await pool.end()
|
||||
})
|
||||
})
|
||||
@ -1,4 +0,0 @@
|
||||
// only newer versions of node support async iterator
|
||||
if (!process.version.startsWith('v8')) {
|
||||
require('./async-iterator.es6')
|
||||
}
|
||||
116
packages/pg-query-stream/test/async-iterator.ts
Normal file
116
packages/pg-query-stream/test/async-iterator.ts
Normal file
@ -0,0 +1,116 @@
|
||||
import QueryStream from '../src'
|
||||
import pg from 'pg'
|
||||
import assert from 'assert'
|
||||
|
||||
const queryText = 'SELECT * FROM generate_series(0, 200) num'
|
||||
|
||||
// node v8 do not support async iteration
|
||||
if (!process.version.startsWith('v8')) {
|
||||
describe('Async iterator', () => {
|
||||
it('works', async () => {
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
const query = client.query(stream)
|
||||
const rows = []
|
||||
for await (const row of query) {
|
||||
rows.push(row)
|
||||
}
|
||||
assert.equal(rows.length, 201)
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can async iterate and then do a query afterwards', async () => {
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
const query = client.query(stream)
|
||||
const iteratorRows = []
|
||||
for await (const row of query) {
|
||||
iteratorRows.push(row)
|
||||
}
|
||||
assert.equal(iteratorRows.length, 201)
|
||||
const { rows } = await client.query('SELECT NOW()')
|
||||
assert.equal(rows.length, 1)
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can async iterate multiple times with a pool', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
|
||||
const allRows = []
|
||||
const run = async () => {
|
||||
// get the client
|
||||
const client = await pool.connect()
|
||||
// stream some rows
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const iteratorRows = []
|
||||
client.query(stream)
|
||||
for await (const row of stream) {
|
||||
iteratorRows.push(row)
|
||||
allRows.push(row)
|
||||
}
|
||||
assert.equal(iteratorRows.length, 201)
|
||||
client.release()
|
||||
}
|
||||
await Promise.all([run(), run(), run()])
|
||||
assert.equal(allRows.length, 603)
|
||||
await pool.end()
|
||||
})
|
||||
|
||||
it('can break out of iteration early', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
const client = await pool.connect()
|
||||
const rows = []
|
||||
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
|
||||
rows.push(row)
|
||||
break
|
||||
}
|
||||
for await (const row of client.query(new QueryStream(queryText, []))) {
|
||||
rows.push(row)
|
||||
break
|
||||
}
|
||||
for await (const row of client.query(new QueryStream(queryText, []))) {
|
||||
rows.push(row)
|
||||
break
|
||||
}
|
||||
assert.strictEqual(rows.length, 3)
|
||||
client.release()
|
||||
await pool.end()
|
||||
})
|
||||
|
||||
it('only returns rows on first iteration', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
const client = await pool.connect()
|
||||
const rows = []
|
||||
const stream = client.query(new QueryStream(queryText, []))
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
break
|
||||
}
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
}
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
}
|
||||
assert.strictEqual(rows.length, 1)
|
||||
client.release()
|
||||
await pool.end()
|
||||
})
|
||||
|
||||
it('can read with delays', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
const client = await pool.connect()
|
||||
const rows = []
|
||||
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
|
||||
for await (const row of stream) {
|
||||
rows.push(row)
|
||||
await new Promise((resolve) => setTimeout(resolve, 1))
|
||||
}
|
||||
assert.strictEqual(rows.length, 201)
|
||||
client.release()
|
||||
await pool.end()
|
||||
})
|
||||
})
|
||||
}
|
||||
@ -1,17 +1,18 @@
|
||||
var pg = require('pg')
|
||||
var assert = require('assert')
|
||||
var QueryStream = require('../')
|
||||
import pg from 'pg'
|
||||
import assert from 'assert'
|
||||
import QueryStream from '../src'
|
||||
|
||||
describe('client options', function () {
|
||||
it('uses custom types from client config', function (done) {
|
||||
const types = {
|
||||
getTypeParser: () => (string) => string,
|
||||
}
|
||||
var client = new pg.Client({ types })
|
||||
//@ts-expect-error
|
||||
const client = new pg.Client({ types })
|
||||
client.connect()
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 10) num')
|
||||
var query = client.query(stream)
|
||||
var result = []
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 10) num')
|
||||
const query = client.query(stream)
|
||||
const result = []
|
||||
query.on('data', (datum) => {
|
||||
result.push(datum)
|
||||
})
|
||||
@ -1,16 +1,18 @@
|
||||
var assert = require('assert')
|
||||
var concat = require('concat-stream')
|
||||
|
||||
var QueryStream = require('../')
|
||||
var helper = require('./helper')
|
||||
import assert from 'assert'
|
||||
import concat from 'concat-stream'
|
||||
import QueryStream from '../src'
|
||||
import helper from './helper'
|
||||
|
||||
if (process.version.startsWith('v8.')) {
|
||||
console.error('warning! node less than 10lts stream closing semantics may not behave properly')
|
||||
} else {
|
||||
helper('close', function (client) {
|
||||
it('emits close', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 })
|
||||
var query = client.query(stream)
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {
|
||||
batchSize: 2,
|
||||
highWaterMark: 2,
|
||||
})
|
||||
const query = client.query(stream)
|
||||
query.pipe(concat(function () {}))
|
||||
query.on('close', done)
|
||||
})
|
||||
@ -18,12 +20,12 @@ if (process.version.startsWith('v8.')) {
|
||||
|
||||
helper('early close', function (client) {
|
||||
it('can be closed early', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {
|
||||
batchSize: 2,
|
||||
highWaterMark: 2,
|
||||
})
|
||||
var query = client.query(stream)
|
||||
var readCount = 0
|
||||
const query = client.query(stream)
|
||||
let readCount = 0
|
||||
query.on('readable', function () {
|
||||
readCount++
|
||||
query.read()
|
||||
@ -38,7 +40,7 @@ if (process.version.startsWith('v8.')) {
|
||||
})
|
||||
|
||||
it('can destroy stream while reading', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
|
||||
client.query(stream)
|
||||
stream.on('data', () => done(new Error('stream should not have returned rows')))
|
||||
setTimeout(() => {
|
||||
@ -48,7 +50,7 @@ if (process.version.startsWith('v8.')) {
|
||||
})
|
||||
|
||||
it('emits an error when calling destroy with an error', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
|
||||
client.query(stream)
|
||||
stream.on('data', () => done(new Error('stream should not have returned rows')))
|
||||
setTimeout(() => {
|
||||
@ -63,7 +65,7 @@ if (process.version.startsWith('v8.')) {
|
||||
})
|
||||
|
||||
it('can destroy stream while reading an error', function (done) {
|
||||
var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;')
|
||||
const stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;')
|
||||
client.query(stream)
|
||||
stream.on('data', () => done(new Error('stream should not have returned rows')))
|
||||
stream.once('error', () => {
|
||||
@ -74,7 +76,7 @@ if (process.version.startsWith('v8.')) {
|
||||
})
|
||||
|
||||
it('does not crash when destroying the stream immediately after calling read', function (done) {
|
||||
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
|
||||
const stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
|
||||
client.query(stream)
|
||||
stream.on('data', () => done(new Error('stream should not have returned rows')))
|
||||
stream.destroy()
|
||||
@ -82,7 +84,7 @@ if (process.version.startsWith('v8.')) {
|
||||
})
|
||||
|
||||
it('does not crash when destroying the stream before its submitted', function (done) {
|
||||
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
|
||||
const stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
|
||||
stream.on('data', () => done(new Error('stream should not have returned rows')))
|
||||
stream.destroy()
|
||||
stream.on('close', done)
|
||||
@ -1,14 +1,13 @@
|
||||
var assert = require('assert')
|
||||
var concat = require('concat-stream')
|
||||
var through = require('through')
|
||||
var helper = require('./helper')
|
||||
|
||||
var QueryStream = require('../')
|
||||
import assert from 'assert'
|
||||
import concat from 'concat-stream'
|
||||
import through from 'through'
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('concat', function (client) {
|
||||
it('concats correctly', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
var query = client.query(stream)
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
const query = client.query(stream)
|
||||
query
|
||||
.pipe(
|
||||
through(function (row) {
|
||||
@ -17,7 +16,7 @@ helper('concat', function (client) {
|
||||
)
|
||||
.pipe(
|
||||
concat(function (result) {
|
||||
var total = result.reduce(function (prev, cur) {
|
||||
const total = result.reduce(function (prev, cur) {
|
||||
return prev + cur
|
||||
})
|
||||
assert.equal(total, 20100)
|
||||
@ -1,26 +0,0 @@
|
||||
var assert = require('assert')
|
||||
var QueryStream = require('../')
|
||||
|
||||
describe('stream config options', () => {
|
||||
// this is mostly for backwards compatability.
|
||||
it('sets readable.highWaterMark based on batch size', () => {
|
||||
var stream = new QueryStream('SELECT NOW()', [], {
|
||||
batchSize: 88,
|
||||
})
|
||||
assert.equal(stream._readableState.highWaterMark, 88)
|
||||
})
|
||||
|
||||
it('sets readable.highWaterMark based on highWaterMark config', () => {
|
||||
var stream = new QueryStream('SELECT NOW()', [], {
|
||||
highWaterMark: 88,
|
||||
})
|
||||
|
||||
assert.equal(stream._readableState.highWaterMark, 88)
|
||||
})
|
||||
|
||||
it('defaults to 100 for highWaterMark', () => {
|
||||
var stream = new QueryStream('SELECT NOW()', [])
|
||||
|
||||
assert.equal(stream._readableState.highWaterMark, 100)
|
||||
})
|
||||
})
|
||||
26
packages/pg-query-stream/test/config.ts
Normal file
26
packages/pg-query-stream/test/config.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import assert from 'assert'
|
||||
import QueryStream from '../src'
|
||||
|
||||
describe('stream config options', () => {
|
||||
// this is mostly for backwards compatibility.
|
||||
it('sets readable.highWaterMark based on batch size', () => {
|
||||
const stream = new QueryStream('SELECT NOW()', [], {
|
||||
batchSize: 88,
|
||||
})
|
||||
assert.equal(stream.readableHighWaterMark, 88)
|
||||
})
|
||||
|
||||
it('sets readable.highWaterMark based on highWaterMark config', () => {
|
||||
const stream = new QueryStream('SELECT NOW()', [], {
|
||||
highWaterMark: 88,
|
||||
})
|
||||
|
||||
assert.equal(stream.readableHighWaterMark, 88)
|
||||
})
|
||||
|
||||
it('defaults to 100 for highWaterMark', () => {
|
||||
const stream = new QueryStream('SELECT NOW()', [])
|
||||
|
||||
assert.equal(stream.readableHighWaterMark, 100)
|
||||
})
|
||||
})
|
||||
@ -1,6 +1,5 @@
|
||||
const assert = require('assert')
|
||||
const helper = require('./helper')
|
||||
const QueryStream = require('../')
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('empty-query', function (client) {
|
||||
it('handles empty query', function (done) {
|
||||
@ -1,12 +1,11 @@
|
||||
var assert = require('assert')
|
||||
var helper = require('./helper')
|
||||
|
||||
var QueryStream = require('../')
|
||||
import assert from 'assert'
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('error', function (client) {
|
||||
it('receives error on stream', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM asdf num', [])
|
||||
var query = client.query(stream)
|
||||
const stream = new QueryStream('SELECT * FROM asdf num', [])
|
||||
const query = client.query(stream)
|
||||
query
|
||||
.on('error', function (err) {
|
||||
assert(err)
|
||||
@ -1,14 +1,14 @@
|
||||
var assert = require('assert')
|
||||
var helper = require('./helper')
|
||||
var QueryStream = require('../')
|
||||
import assert from 'assert'
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('fast reader', function (client) {
|
||||
it('works', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
var query = client.query(stream)
|
||||
var result = []
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
const query = client.query(stream)
|
||||
const result = []
|
||||
stream.on('readable', function () {
|
||||
var res = stream.read()
|
||||
let res = stream.read()
|
||||
while (res) {
|
||||
if (result.length !== 201) {
|
||||
assert(res, 'should not return null on evented reader')
|
||||
@ -24,7 +24,7 @@ helper('fast reader', function (client) {
|
||||
}
|
||||
})
|
||||
stream.on('end', function () {
|
||||
var total = result.reduce(function (prev, cur) {
|
||||
const total = result.reduce(function (prev, cur) {
|
||||
return prev + cur
|
||||
})
|
||||
assert.equal(total, 20100)
|
||||
@ -1,7 +1,8 @@
|
||||
var pg = require('pg')
|
||||
module.exports = function (name, cb) {
|
||||
import pg from 'pg'
|
||||
|
||||
export default function (name, cb) {
|
||||
describe(name, function () {
|
||||
var client = new pg.Client()
|
||||
const client = new pg.Client()
|
||||
|
||||
before(function (done) {
|
||||
client.connect(done)
|
||||
@ -1,17 +0,0 @@
|
||||
var assert = require('assert')
|
||||
var concat = require('concat-stream')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
require('./helper')('instant', function (client) {
|
||||
it('instant', function (done) {
|
||||
var query = new QueryStream('SELECT pg_sleep(1)', [])
|
||||
var stream = client.query(query)
|
||||
stream.pipe(
|
||||
concat(function (res) {
|
||||
assert.equal(res.length, 1)
|
||||
done()
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
17
packages/pg-query-stream/test/instant.ts
Normal file
17
packages/pg-query-stream/test/instant.ts
Normal file
@ -0,0 +1,17 @@
|
||||
import helper from './helper'
|
||||
import assert from 'assert'
|
||||
import concat from 'concat-stream'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('instant', function (client) {
|
||||
it('instant', function (done) {
|
||||
const query = new QueryStream('SELECT pg_sleep(1)', [])
|
||||
const stream = client.query(query)
|
||||
stream.pipe(
|
||||
concat(function (res) {
|
||||
assert.equal(res.length, 1)
|
||||
done()
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
@ -1,8 +1,9 @@
|
||||
var pg = require('pg')
|
||||
var QueryStream = require('../')
|
||||
import pg from 'pg'
|
||||
import QueryStream from '../src'
|
||||
|
||||
describe('end semantics race condition', function () {
|
||||
before(function (done) {
|
||||
var client = new pg.Client()
|
||||
const client = new pg.Client()
|
||||
client.connect()
|
||||
client.on('drain', client.end.bind(client))
|
||||
client.on('end', done)
|
||||
@ -10,14 +11,14 @@ describe('end semantics race condition', function () {
|
||||
client.query('create table IF NOT EXISTS c(id int primary key references p)')
|
||||
})
|
||||
it('works', function (done) {
|
||||
var client1 = new pg.Client()
|
||||
const client1 = new pg.Client()
|
||||
client1.connect()
|
||||
var client2 = new pg.Client()
|
||||
const client2 = new pg.Client()
|
||||
client2.connect()
|
||||
|
||||
var qr = new QueryStream('INSERT INTO p DEFAULT VALUES RETURNING id')
|
||||
const qr = new QueryStream('INSERT INTO p DEFAULT VALUES RETURNING id')
|
||||
client1.query(qr)
|
||||
var id = null
|
||||
let id = null
|
||||
qr.on('data', function (row) {
|
||||
id = row.id
|
||||
})
|
||||
@ -1,12 +1,12 @@
|
||||
var assert = require('assert')
|
||||
var helper = require('./helper')
|
||||
var QueryStream = require('../')
|
||||
import assert from 'assert'
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('passing options', function (client) {
|
||||
it('passes row mode array', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 10) num', [], { rowMode: 'array' })
|
||||
var query = client.query(stream)
|
||||
var result = []
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 10) num', [], { rowMode: 'array' })
|
||||
const query = client.query(stream)
|
||||
const result = []
|
||||
query.on('data', (datum) => {
|
||||
result.push(datum)
|
||||
})
|
||||
@ -21,9 +21,9 @@ helper('passing options', function (client) {
|
||||
const types = {
|
||||
getTypeParser: () => (string) => string,
|
||||
}
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 10) num', [], { types })
|
||||
var query = client.query(stream)
|
||||
var result = []
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 10) num', [], { types })
|
||||
const query = client.query(stream)
|
||||
const result = []
|
||||
query.on('data', (datum) => {
|
||||
result.push(datum)
|
||||
})
|
||||
@ -1,23 +0,0 @@
|
||||
var concat = require('concat-stream')
|
||||
var tester = require('stream-tester')
|
||||
var JSONStream = require('JSONStream')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
require('./helper')('pauses', function (client) {
|
||||
it('pauses', function (done) {
|
||||
this.timeout(5000)
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], { batchSize: 2, highWaterMark: 2 })
|
||||
var query = client.query(stream)
|
||||
var pauser = tester.createPauseStream(0.1, 100)
|
||||
query
|
||||
.pipe(JSONStream.stringify())
|
||||
.pipe(pauser)
|
||||
.pipe(
|
||||
concat(function (json) {
|
||||
JSON.parse(json)
|
||||
done()
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
26
packages/pg-query-stream/test/pauses.ts
Normal file
26
packages/pg-query-stream/test/pauses.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import helper from './helper'
|
||||
import concat from 'concat-stream'
|
||||
import tester from 'stream-tester'
|
||||
import JSONStream from 'JSONStream'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('pauses', function (client) {
|
||||
it('pauses', function (done) {
|
||||
this.timeout(5000)
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {
|
||||
batchSize: 2,
|
||||
highWaterMark: 2,
|
||||
})
|
||||
const query = client.query(stream)
|
||||
const pauser = tester.createPauseStream(0.1, 100)
|
||||
query
|
||||
.pipe(JSONStream.stringify())
|
||||
.pipe(pauser)
|
||||
.pipe(
|
||||
concat(function (json) {
|
||||
JSON.parse(json)
|
||||
done()
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
@ -1,10 +1,10 @@
|
||||
var helper = require('./helper')
|
||||
var QueryStream = require('../')
|
||||
var concat = require('concat-stream')
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
import concat from 'concat-stream'
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
import { Transform } from 'stream'
|
||||
|
||||
var mapper = new Transform({ objectMode: true })
|
||||
const mapper = new Transform({ objectMode: true })
|
||||
|
||||
mapper._transform = function (obj, enc, cb) {
|
||||
this.push(obj)
|
||||
@ -14,7 +14,7 @@ mapper._transform = function (obj, enc, cb) {
|
||||
helper('slow reader', function (client) {
|
||||
it('works', function (done) {
|
||||
this.timeout(50000)
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], {
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], {
|
||||
highWaterMark: 100,
|
||||
batchSize: 50,
|
||||
})
|
||||
@ -1,25 +0,0 @@
|
||||
var QueryStream = require('../')
|
||||
var spec = require('stream-spec')
|
||||
var assert = require('assert')
|
||||
|
||||
require('./helper')('stream tester timestamp', function (client) {
|
||||
it('should not warn about max listeners', function (done) {
|
||||
var sql = "SELECT * FROM generate_series('1983-12-30 00:00'::timestamp, '2013-12-30 00:00', '1 years')"
|
||||
var stream = new QueryStream(sql, [])
|
||||
var ended = false
|
||||
var query = client.query(stream)
|
||||
query.on('end', function () {
|
||||
ended = true
|
||||
})
|
||||
spec(query).readable().pausable({ strict: true }).validateOnExit()
|
||||
var checkListeners = function () {
|
||||
assert(stream.listeners('end').length < 10)
|
||||
if (!ended) {
|
||||
setImmediate(checkListeners)
|
||||
} else {
|
||||
done()
|
||||
}
|
||||
}
|
||||
checkListeners()
|
||||
})
|
||||
})
|
||||
26
packages/pg-query-stream/test/stream-tester-timestamp.ts
Normal file
26
packages/pg-query-stream/test/stream-tester-timestamp.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
import spec from 'stream-spec'
|
||||
import assert from 'assert'
|
||||
|
||||
helper('stream tester timestamp', function (client) {
|
||||
it('should not warn about max listeners', function (done) {
|
||||
const sql = "SELECT * FROM generate_series('1983-12-30 00:00'::timestamp, '2013-12-30 00:00', '1 years')"
|
||||
const stream = new QueryStream(sql, [])
|
||||
let ended = false
|
||||
const query = client.query(stream)
|
||||
query.on('end', function () {
|
||||
ended = true
|
||||
})
|
||||
spec(query).readable().pausable({ strict: true }).validateOnExit()
|
||||
const checkListeners = function () {
|
||||
assert(stream.listeners('end').length < 10)
|
||||
if (!ended) {
|
||||
setImmediate(checkListeners)
|
||||
} else {
|
||||
done()
|
||||
}
|
||||
}
|
||||
checkListeners()
|
||||
})
|
||||
})
|
||||
@ -1,12 +0,0 @@
|
||||
var spec = require('stream-spec')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
require('./helper')('stream tester', function (client) {
|
||||
it('passes stream spec', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
var query = client.query(stream)
|
||||
spec(query).readable().pausable({ strict: true }).validateOnExit()
|
||||
stream.on('end', done)
|
||||
})
|
||||
})
|
||||
12
packages/pg-query-stream/test/stream-tester.ts
Normal file
12
packages/pg-query-stream/test/stream-tester.ts
Normal file
@ -0,0 +1,12 @@
|
||||
import spec from 'stream-spec'
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
|
||||
helper('stream tester', function (client) {
|
||||
it('passes stream spec', function (done) {
|
||||
const stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
const query = client.query(stream)
|
||||
spec(query).readable().pausable({ strict: true }).validateOnExit()
|
||||
stream.on('end', done)
|
||||
})
|
||||
})
|
||||
26
packages/pg-query-stream/tsconfig.json
Normal file
26
packages/pg-query-stream/tsconfig.json
Normal file
@ -0,0 +1,26 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"module": "commonjs",
|
||||
"esModuleInterop": true,
|
||||
"allowSyntheticDefaultImports": true,
|
||||
"strict": false,
|
||||
"target": "es6",
|
||||
"noImplicitAny": false,
|
||||
"moduleResolution": "node",
|
||||
"sourceMap": true,
|
||||
"pretty": true,
|
||||
"outDir": "dist",
|
||||
"incremental": true,
|
||||
"baseUrl": ".",
|
||||
"declaration": true,
|
||||
"types": [
|
||||
"node",
|
||||
"pg",
|
||||
"mocha",
|
||||
"chai"
|
||||
]
|
||||
},
|
||||
"include": [
|
||||
"src/**/*"
|
||||
]
|
||||
}
|
||||
12
tsconfig.json
Normal file
12
tsconfig.json
Normal file
@ -0,0 +1,12 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"strict": true,
|
||||
"incremental": true,
|
||||
"composite": true
|
||||
},
|
||||
"include": [],
|
||||
"references": [
|
||||
{"path": "./packages/pg-query-stream"},
|
||||
{"path": "./packages/pg-protocol"}
|
||||
]
|
||||
}
|
||||
39
yarn.lock
39
yarn.lock
@ -941,7 +941,7 @@
|
||||
dependencies:
|
||||
"@types/node" ">= 8"
|
||||
|
||||
"@types/chai@^4.2.7":
|
||||
"@types/chai@^4.2.13", "@types/chai@^4.2.7":
|
||||
version "4.2.13"
|
||||
resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.2.13.tgz#8a3801f6655179d1803d81e94a2e4aaf317abd16"
|
||||
integrity sha512-o3SGYRlOpvLFpwJA6Sl1UPOwKFEvE4FxTEB/c9XHI2whdnd4kmPVkNLL8gY4vWGBxWWDumzLbKsAhEH5SKn37Q==
|
||||
@ -974,21 +974,44 @@
|
||||
resolved "https://registry.yarnpkg.com/@types/mocha/-/mocha-5.2.7.tgz#315d570ccb56c53452ff8638738df60726d5b6ea"
|
||||
integrity sha512-NYrtPht0wGzhwe9+/idPaBB+TqkY9AhTvOLMkThm0IoEfLaiVQZwBwyJ5puCkO3AUCWrmcoePjp2mbFocKy4SQ==
|
||||
|
||||
"@types/mocha@^8.0.3":
|
||||
version "8.0.3"
|
||||
resolved "https://registry.yarnpkg.com/@types/mocha/-/mocha-8.0.3.tgz#51b21b6acb6d1b923bbdc7725c38f9f455166402"
|
||||
integrity sha512-vyxR57nv8NfcU0GZu8EUXZLTbCMupIUwy95LJ6lllN+JRPG25CwMHoB1q5xKh8YKhQnHYRAn4yW2yuHbf/5xgg==
|
||||
|
||||
"@types/node@*", "@types/node@>= 8":
|
||||
version "14.11.8"
|
||||
resolved "https://registry.yarnpkg.com/@types/node/-/node-14.11.8.tgz#fe2012f2355e4ce08bca44aeb3abbb21cf88d33f"
|
||||
integrity sha512-KPcKqKm5UKDkaYPTuXSx8wEP7vE9GnuaXIZKijwRYcePpZFDVuy2a57LarFKiORbHOuTOOwYzxVxcUzsh2P2Pw==
|
||||
version "12.12.21"
|
||||
resolved "https://registry.yarnpkg.com/@types/node/-/node-12.12.21.tgz#aa44a6363291c7037111c47e4661ad210aded23f"
|
||||
integrity sha512-8sRGhbpU+ck1n0PGAUgVrWrWdjSW2aqNeyC15W88GRsMpSwzv6RJGlLhE7s2RhVSOdyDmxbqlWSeThq4/7xqlA==
|
||||
|
||||
"@types/node@^12.12.21":
|
||||
version "12.12.67"
|
||||
resolved "https://registry.yarnpkg.com/@types/node/-/node-12.12.67.tgz#4f86badb292e822e3b13730a1f9713ed2377f789"
|
||||
integrity sha512-R48tgL2izApf+9rYNH+3RBMbRpPeW3N8f0I9HMhggeq4UXwBDqumJ14SDs4ctTMhG11pIOduZ4z3QWGOiMc9Vg==
|
||||
|
||||
"@types/node@^14.0.0":
|
||||
version "14.11.8"
|
||||
resolved "https://registry.yarnpkg.com/@types/node/-/node-14.11.8.tgz#fe2012f2355e4ce08bca44aeb3abbb21cf88d33f"
|
||||
integrity sha512-KPcKqKm5UKDkaYPTuXSx8wEP7vE9GnuaXIZKijwRYcePpZFDVuy2a57LarFKiORbHOuTOOwYzxVxcUzsh2P2Pw==
|
||||
|
||||
"@types/normalize-package-data@^2.4.0":
|
||||
version "2.4.0"
|
||||
resolved "https://registry.yarnpkg.com/@types/normalize-package-data/-/normalize-package-data-2.4.0.tgz#e486d0d97396d79beedd0a6e33f4534ff6b4973e"
|
||||
integrity sha512-f5j5b/Gf71L+dbqxIpQ4Z2WlmI/mPJ0fOkGGmFgtb6sAu97EPczzbS3/tJKxmcYDj55OX6ssqwDAWOHIYDRDGA==
|
||||
|
||||
"@types/pg-types@*":
|
||||
version "1.11.5"
|
||||
resolved "https://registry.yarnpkg.com/@types/pg-types/-/pg-types-1.11.5.tgz#1eebbe62b6772fcc75c18957a90f933d155e005b"
|
||||
integrity sha512-L8ogeT6vDzT1vxlW3KITTCt+BVXXVkLXfZ/XNm6UqbcJgxf+KPO7yjWx7dQQE8RW07KopL10x2gNMs41+IkMGQ==
|
||||
|
||||
"@types/pg@^7.14.5":
|
||||
version "7.14.5"
|
||||
resolved "https://registry.yarnpkg.com/@types/pg/-/pg-7.14.5.tgz#07638c7aa69061abe4be31267028cc5c3fc35f98"
|
||||
integrity sha512-wqTKZmqkqXd1YiVRBT2poRrMIojwEi2bKTAAjUX6nEbzr98jc3cfR/7o7ZtubhH5xT7YJ6LRdRr1GZOgs8OUjg==
|
||||
dependencies:
|
||||
"@types/node" "*"
|
||||
"@types/pg-types" "*"
|
||||
|
||||
"@typescript-eslint/eslint-plugin@^4.4.0":
|
||||
version "4.4.0"
|
||||
resolved "https://registry.yarnpkg.com/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.4.0.tgz#0321684dd2b902c89128405cf0385e9fe8561934"
|
||||
@ -6096,10 +6119,10 @@ typedarray@^0.0.6:
|
||||
resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777"
|
||||
integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c=
|
||||
|
||||
typescript@^3.7.3:
|
||||
version "3.9.7"
|
||||
resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.9.7.tgz#98d600a5ebdc38f40cb277522f12dc800e9e25fa"
|
||||
integrity sha512-BLbiRkiBzAwsjut4x/dsibSTB6yWpwT5qWmC2OfuCg3GgVQCSgMs4vEctYPhsaGtd0AeuuHMkjZ2h2WG8MSzRw==
|
||||
typescript@^4.0.3:
|
||||
version "4.0.3"
|
||||
resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.0.3.tgz#153bbd468ef07725c1df9c77e8b453f8d36abba5"
|
||||
integrity sha512-tEu6DGxGgRJPb/mVPIZ48e69xCn2yRmCgYmDugAVwmJ6o+0u1RI18eO7E7WBTLYLaEVVOhwQmcdhQHweux/WPg==
|
||||
|
||||
uglify-js@^3.1.4:
|
||||
version "3.11.1"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user