mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
134 lines
4.0 KiB
TypeScript
134 lines
4.0 KiB
TypeScript
import QueryStream from '../src'
|
|
import pg from 'pg'
|
|
import assert from 'assert'
|
|
|
|
const queryText = 'SELECT * FROM generate_series(0, 200) num'
|
|
|
|
// node v8 do not support async iteration
|
|
if (!process.version.startsWith('v8')) {
|
|
describe('Async iterator', () => {
|
|
it('works', async () => {
|
|
const stream = new QueryStream(queryText, [])
|
|
const client = new pg.Client()
|
|
await client.connect()
|
|
const query = client.query(stream)
|
|
const rows = []
|
|
for await (const row of query) {
|
|
rows.push(row)
|
|
}
|
|
assert.equal(rows.length, 201)
|
|
await client.end()
|
|
})
|
|
|
|
it('can async iterate and then do a query afterwards', async () => {
|
|
const stream = new QueryStream(queryText, [])
|
|
const client = new pg.Client()
|
|
await client.connect()
|
|
const query = client.query(stream)
|
|
const iteratorRows = []
|
|
for await (const row of query) {
|
|
iteratorRows.push(row)
|
|
}
|
|
assert.equal(iteratorRows.length, 201)
|
|
const { rows } = await client.query('SELECT NOW()')
|
|
assert.equal(rows.length, 1)
|
|
await client.end()
|
|
})
|
|
|
|
it('can async iterate multiple times with a pool', async () => {
|
|
const pool = new pg.Pool({ max: 1 })
|
|
|
|
const allRows = []
|
|
const run = async () => {
|
|
// get the client
|
|
const client = await pool.connect()
|
|
// stream some rows
|
|
const stream = new QueryStream(queryText, [])
|
|
const iteratorRows = []
|
|
client.query(stream)
|
|
for await (const row of stream) {
|
|
iteratorRows.push(row)
|
|
allRows.push(row)
|
|
}
|
|
assert.equal(iteratorRows.length, 201)
|
|
client.release()
|
|
}
|
|
await Promise.all([run(), run(), run()])
|
|
assert.equal(allRows.length, 603)
|
|
await pool.end()
|
|
})
|
|
|
|
it('can break out of iteration early', async () => {
|
|
const pool = new pg.Pool({ max: 1 })
|
|
const client = await pool.connect()
|
|
const rows = []
|
|
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
|
|
rows.push(row)
|
|
break
|
|
}
|
|
for await (const row of client.query(new QueryStream(queryText, []))) {
|
|
rows.push(row)
|
|
break
|
|
}
|
|
for await (const row of client.query(new QueryStream(queryText, []))) {
|
|
rows.push(row)
|
|
break
|
|
}
|
|
assert.strictEqual(rows.length, 3)
|
|
client.release()
|
|
await pool.end()
|
|
})
|
|
|
|
it('only returns rows on first iteration', async () => {
|
|
const pool = new pg.Pool({ max: 1 })
|
|
const client = await pool.connect()
|
|
const rows = []
|
|
const stream = client.query(new QueryStream(queryText, []))
|
|
for await (const row of stream) {
|
|
rows.push(row)
|
|
break
|
|
}
|
|
|
|
try {
|
|
for await (const row of stream) {
|
|
rows.push(row)
|
|
}
|
|
for await (const row of stream) {
|
|
rows.push(row)
|
|
}
|
|
} catch (e) {
|
|
// swallow error - node 17 throws if stream is aborted
|
|
}
|
|
assert.strictEqual(rows.length, 1)
|
|
client.release()
|
|
await pool.end()
|
|
})
|
|
|
|
it('can read with delays', async () => {
|
|
const pool = new pg.Pool({ max: 1 })
|
|
const client = await pool.connect()
|
|
const rows = []
|
|
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
|
|
for await (const row of stream) {
|
|
rows.push(row)
|
|
await new Promise((resolve) => setTimeout(resolve, 1))
|
|
}
|
|
assert.strictEqual(rows.length, 201)
|
|
client.release()
|
|
await pool.end()
|
|
})
|
|
|
|
it('supports breaking with low watermark', async function () {
|
|
const pool = new pg.Pool({ max: 1 })
|
|
const client = await pool.connect()
|
|
|
|
for await (const _ of client.query(new QueryStream('select TRUE', [], { highWaterMark: 1 }))) break
|
|
for await (const _ of client.query(new QueryStream('select TRUE', [], { highWaterMark: 1 }))) break
|
|
for await (const _ of client.query(new QueryStream('select TRUE', [], { highWaterMark: 1 }))) break
|
|
|
|
client.release()
|
|
await pool.end()
|
|
})
|
|
})
|
|
}
|