Brian C fe88e825e5
Add pg-native to monorepo (#3225)
I didn't do much to "modernize" the pg-native codebase other than running it through the standard eslint --fix that is applied to the rest of the code. There's some easy opportunities there to update it to es6 and so on...it still uses some pretty antiquated coding styles in places.  This PR re-introduces the native tests on node v20, and updates test matrix to drop unsupported versions of node & add in node v22.
2024-06-04 10:26:11 -05:00

156 lines
3.6 KiB
JavaScript

var Duplex = require('stream').Duplex
var Writable = require('stream').Writable
var util = require('util')
var CopyStream = (module.exports = function (pq, options) {
Duplex.call(this, options)
this.pq = pq
this._reading = false
})
util.inherits(CopyStream, Duplex)
// writer methods
CopyStream.prototype._write = function (chunk, encoding, cb) {
var result = this.pq.putCopyData(chunk)
// sent successfully
if (result === 1) return cb()
// error
if (result === -1) return cb(new Error(this.pq.errorMessage()))
// command would block. wait for writable and call again.
var self = this
this.pq.writable(function () {
self._write(chunk, encoding, cb)
})
}
CopyStream.prototype.end = function () {
var args = Array.prototype.slice.call(arguments, 0)
var self = this
var callback = args.pop()
if (args.length) {
this.write(args[0])
}
var result = this.pq.putCopyEnd()
// sent successfully
if (result === 1) {
// consume our results and then call 'end' on the
// "parent" writable class so we can emit 'finish' and
// all that jazz
return consumeResults(this.pq, function (err, res) {
Writable.prototype.end.call(self)
// handle possible passing of callback to end method
if (callback) {
callback(err)
}
})
}
// error
if (result === -1) {
var err = new Error(this.pq.errorMessage())
return this.emit('error', err)
}
// command would block. wait for writable and call end again
// don't pass any buffers to end on the second call because
// we already sent them to possible this.write the first time
// we called end
return this.pq.writable(function () {
return self.end.apply(self, callback)
})
}
// reader methods
CopyStream.prototype._consumeBuffer = function (cb) {
var result = this.pq.getCopyData(true)
if (result instanceof Buffer) {
return setImmediate(function () {
cb(null, result)
})
}
if (result === -1) {
// end of stream
return cb(null, null)
}
if (result === 0) {
var self = this
this.pq.once('readable', function () {
self.pq.stopReader()
self.pq.consumeInput()
self._consumeBuffer(cb)
})
return this.pq.startReader()
}
cb(new Error('Unrecognized read status: ' + result))
}
CopyStream.prototype._read = function (size) {
if (this._reading) return
this._reading = true
// console.log('read begin');
var self = this
this._consumeBuffer(function (err, buffer) {
self._reading = false
if (err) {
return self.emit('error', err)
}
if (buffer === false) {
// nothing to read for now, return
return
}
self.push(buffer)
})
}
var consumeResults = function (pq, cb) {
var cleanup = function () {
pq.removeListener('readable', onReadable)
pq.stopReader()
}
var readError = function (message) {
cleanup()
return cb(new Error(message || pq.errorMessage()))
}
var onReadable = function () {
// read waiting data from the socket
// e.g. clear the pending 'select'
if (!pq.consumeInput()) {
return readError()
}
// check if there is still outstanding data
// if so, wait for it all to come in
if (pq.isBusy()) {
return
}
// load our result object
pq.getResult()
// "read until results return null"
// or in our case ensure we only have one result
if (pq.getResult() && pq.resultStatus() !== 'PGRES_COPY_OUT') {
return readError('Only one result at a time is accepted')
}
if (pq.resultStatus() === 'PGRES_FATAL_ERROR') {
return readError()
}
cleanup()
return cb(null)
}
pq.on('readable', onReadable)
pq.startReader()
}