mirror of
https://github.com/brianc/node-postgres.git
synced 2026-01-25 16:03:13 +00:00
Add Cloudflare Worker compatible socket
This commit is contained in:
parent
5532ca51db
commit
07553428e9
33
packages/pg-cloudflare/README.md
Normal file
33
packages/pg-cloudflare/README.md
Normal file
@ -0,0 +1,33 @@
|
||||
# pg-cloudflare
|
||||
|
||||
A socket implementation that can run on Cloudflare Workers using native TCP connections.
|
||||
|
||||
## install
|
||||
|
||||
```
|
||||
npm i --save-dev pg-cloudflare
|
||||
```
|
||||
|
||||
### license
|
||||
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2023 Brian M. Carlson
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
27
packages/pg-cloudflare/package.json
Normal file
27
packages/pg-cloudflare/package.json
Normal file
@ -0,0 +1,27 @@
|
||||
{
|
||||
"name": "pg-cloudflare",
|
||||
"version": "1.0.0",
|
||||
"description": "A socket implementation that can run on Cloudflare Workers using native TCP connections.",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"devDependencies": {
|
||||
"ts-node": "^8.5.4",
|
||||
"typescript": "^4.0.3"
|
||||
},
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"build:watch": "tsc --watch",
|
||||
"prepublish": "yarn build",
|
||||
"test": "echo e2e test in pg package"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git://github.com/brianc/node-postgres.git",
|
||||
"directory": "packages/pg-cloudflare"
|
||||
},
|
||||
"files": [
|
||||
"/dist/*{js,ts,map}",
|
||||
"/src"
|
||||
]
|
||||
}
|
||||
164
packages/pg-cloudflare/src/index.ts
Normal file
164
packages/pg-cloudflare/src/index.ts
Normal file
@ -0,0 +1,164 @@
|
||||
import { SocketOptions, Socket, TlsOptions } from 'cloudflare:sockets'
|
||||
import { EventEmitter } from 'events'
|
||||
|
||||
/**
|
||||
* Wrapper around the Cloudflare built-in socket that can be used by the `Connection`.
|
||||
*/
|
||||
export class CloudflareSocket extends EventEmitter {
|
||||
writable = false
|
||||
destroyed = false
|
||||
|
||||
private _upgrading = false
|
||||
private _upgraded = false
|
||||
private _cfSocket: Socket | null = null
|
||||
private _cfWriter: WritableStreamDefaultWriter | null = null
|
||||
private _cfReader: ReadableStreamDefaultReader | null = null
|
||||
|
||||
constructor(readonly ssl: boolean) {
|
||||
super()
|
||||
}
|
||||
|
||||
setNoDelay() {
|
||||
return this
|
||||
}
|
||||
setKeepAlive() {
|
||||
return this
|
||||
}
|
||||
ref() {
|
||||
return this
|
||||
}
|
||||
unref() {
|
||||
return this
|
||||
}
|
||||
|
||||
async connect(port: number, host: string, connectListener?: (...args: unknown[]) => void) {
|
||||
try {
|
||||
log('connecting')
|
||||
if (connectListener) this.once('connect', connectListener)
|
||||
|
||||
const options: SocketOptions = this.ssl ? { secureTransport: 'starttls' } : {}
|
||||
const { connect } = await import('cloudflare:sockets')
|
||||
this._cfSocket = connect(`${host}:${port}`, options)
|
||||
this._cfWriter = this._cfSocket.writable.getWriter()
|
||||
this._addClosedHandler()
|
||||
|
||||
this._cfReader = this._cfSocket.readable.getReader()
|
||||
if (this.ssl) {
|
||||
this._listenOnce().catch((e) => this.emit('error', e))
|
||||
} else {
|
||||
this._listen().catch((e) => this.emit('error', e))
|
||||
}
|
||||
|
||||
await this._cfWriter!.ready
|
||||
log('socket ready')
|
||||
this.writable = true
|
||||
this.emit('connect')
|
||||
|
||||
return this
|
||||
} catch (e) {
|
||||
this.emit('error', e)
|
||||
}
|
||||
}
|
||||
|
||||
async _listen() {
|
||||
while (true) {
|
||||
log('awaiting receive from CF socket')
|
||||
const { done, value } = await this._cfReader!.read()
|
||||
log('CF socket received:', done, value)
|
||||
if (done) {
|
||||
log('done')
|
||||
break
|
||||
}
|
||||
this.emit('data', Buffer.from(value))
|
||||
}
|
||||
}
|
||||
|
||||
async _listenOnce() {
|
||||
log('awaiting first receive from CF socket')
|
||||
const { done, value } = await this._cfReader!.read()
|
||||
log('First CF socket received:', done, value)
|
||||
this.emit('data', Buffer.from(value))
|
||||
}
|
||||
|
||||
write(
|
||||
data: Uint8Array | string,
|
||||
encoding: BufferEncoding = 'utf8',
|
||||
callback: (...args: unknown[]) => void = () => {}
|
||||
) {
|
||||
if (data.length === 0) return callback()
|
||||
if (typeof data === 'string') data = Buffer.from(data, encoding)
|
||||
|
||||
log('sending data direct:', data)
|
||||
this._cfWriter!.write(data).then(
|
||||
() => {
|
||||
log('data sent')
|
||||
callback()
|
||||
},
|
||||
(err) => {
|
||||
log('send error', err)
|
||||
callback(err)
|
||||
}
|
||||
)
|
||||
return true
|
||||
}
|
||||
|
||||
end(data = Buffer.alloc(0), encoding: BufferEncoding = 'utf8', callback: (...args: unknown[]) => void = () => {}) {
|
||||
log('ending CF socket')
|
||||
this.write(data, encoding, (err) => {
|
||||
this._cfSocket!.close()
|
||||
if (callback) callback(err)
|
||||
})
|
||||
return this
|
||||
}
|
||||
|
||||
destroy(reason: string) {
|
||||
log('destroying CF socket', reason)
|
||||
this.destroyed = true
|
||||
return this.end()
|
||||
}
|
||||
|
||||
startTls(options: TlsOptions) {
|
||||
if (this._upgraded) {
|
||||
// Don't try to upgrade again.
|
||||
this.emit('error', 'Cannot call `startTls()` more than once on a socket')
|
||||
return
|
||||
}
|
||||
this._cfWriter!.releaseLock()
|
||||
this._cfReader!.releaseLock()
|
||||
this._upgrading = true
|
||||
this._cfSocket = this._cfSocket!.startTls(options)
|
||||
this._cfWriter = this._cfSocket.writable.getWriter()
|
||||
this._cfReader = this._cfSocket.readable.getReader()
|
||||
this._addClosedHandler()
|
||||
this._listen().catch((e) => this.emit('error', e))
|
||||
}
|
||||
|
||||
_addClosedHandler() {
|
||||
this._cfSocket!.closed.then(() => {
|
||||
if (!this._upgrading) {
|
||||
log('CF socket closed')
|
||||
this._cfSocket = null
|
||||
this.emit('close')
|
||||
} else {
|
||||
this._upgrading = false
|
||||
this._upgraded = true
|
||||
}
|
||||
}).catch((e) => this.emit('error', e))
|
||||
}
|
||||
}
|
||||
|
||||
const debug = false
|
||||
|
||||
function dump(data: unknown) {
|
||||
if (data instanceof Uint8Array || data instanceof ArrayBuffer) {
|
||||
const hex = Buffer.from(data).toString('hex')
|
||||
const str = new TextDecoder().decode(data)
|
||||
return `\n>>> STR: "${str.replace(/\n/g, '\\n')}"\n>>> HEX: ${hex}\n`
|
||||
} else {
|
||||
return data
|
||||
}
|
||||
}
|
||||
|
||||
function log(...args: unknown[]) {
|
||||
debug && console.log(...args.map(dump))
|
||||
}
|
||||
25
packages/pg-cloudflare/src/types.d.ts
vendored
Normal file
25
packages/pg-cloudflare/src/types.d.ts
vendored
Normal file
@ -0,0 +1,25 @@
|
||||
declare module 'cloudflare:sockets' {
|
||||
export class Socket {
|
||||
public readonly readable: any
|
||||
public readonly writable: any
|
||||
public readonly closed: Promise<void>
|
||||
public close(): Promise<void>
|
||||
public startTls(options: TlsOptions): Socket
|
||||
}
|
||||
|
||||
export type TlsOptions = {
|
||||
expectedServerHostname?: string
|
||||
}
|
||||
|
||||
export type SocketAddress = {
|
||||
hostname: string
|
||||
port: number
|
||||
}
|
||||
|
||||
export type SocketOptions = {
|
||||
secureTransport?: 'off' | 'on' | 'starttls'
|
||||
allowHalfOpen?: boolean
|
||||
}
|
||||
|
||||
export function connect(address: string | SocketAddress, options?: SocketOptions): Socket
|
||||
}
|
||||
25
packages/pg-cloudflare/tsconfig.json
Normal file
25
packages/pg-cloudflare/tsconfig.json
Normal file
@ -0,0 +1,25 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"module": "ES2020",
|
||||
"esModuleInterop": true,
|
||||
"allowSyntheticDefaultImports": true,
|
||||
"strict": true,
|
||||
"target": "ES2020",
|
||||
"noImplicitAny": true,
|
||||
"moduleResolution": "node",
|
||||
"sourceMap": true,
|
||||
"outDir": "dist",
|
||||
"incremental": true,
|
||||
"baseUrl": ".",
|
||||
"declaration": true,
|
||||
"paths": {
|
||||
"*": [
|
||||
"node_modules/*",
|
||||
"src/types/*"
|
||||
]
|
||||
}
|
||||
},
|
||||
"include": [
|
||||
"src/**/*"
|
||||
]
|
||||
}
|
||||
@ -4,6 +4,7 @@ var net = require('net')
|
||||
var EventEmitter = require('events').EventEmitter
|
||||
|
||||
const { parse, serialize } = require('pg-protocol')
|
||||
const { getStream, getSecureStream } = require('./stream')
|
||||
|
||||
const flushBuffer = serialize.flush()
|
||||
const syncBuffer = serialize.sync()
|
||||
@ -15,7 +16,7 @@ class Connection extends EventEmitter {
|
||||
super()
|
||||
config = config || {}
|
||||
|
||||
this.stream = config.stream || new net.Socket()
|
||||
this.stream = config.stream || getStream(config.ssl)
|
||||
if (typeof this.stream === 'function') {
|
||||
this.stream = this.stream(config)
|
||||
}
|
||||
@ -79,7 +80,6 @@ class Connection extends EventEmitter {
|
||||
self.stream.end()
|
||||
return self.emit('error', new Error('There was an error establishing an SSL connection'))
|
||||
}
|
||||
var tls = require('tls')
|
||||
const options = {
|
||||
socket: self.stream,
|
||||
}
|
||||
@ -97,7 +97,7 @@ class Connection extends EventEmitter {
|
||||
options.servername = host
|
||||
}
|
||||
try {
|
||||
self.stream = tls.connect(options)
|
||||
self.stream = getSecureStream(options)
|
||||
} catch (err) {
|
||||
return self.emit('error', err)
|
||||
}
|
||||
|
||||
28
packages/pg/lib/stream.js
Normal file
28
packages/pg/lib/stream.js
Normal file
@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Get a socket stream compatible with the current runtime environment.
|
||||
* @returns {Duplex}
|
||||
*/
|
||||
module.exports.getStream = function getStream(ssl) {
|
||||
const net = require('net')
|
||||
if (typeof net.Socket === 'function') {
|
||||
return new net.Socket()
|
||||
} else {
|
||||
const { CloudflareSocket } = require('pg-cloudflare')
|
||||
return new CloudflareSocket(ssl)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a TLS secured socket, compatible with the current environment,
|
||||
* using the socket and other settings given in `options`.
|
||||
* @returns {Duplex}
|
||||
*/
|
||||
module.exports.getSecureStream = function getSecureStream(options) {
|
||||
var tls = require('tls')
|
||||
if (tls.connect) {
|
||||
return tls.connect(options)
|
||||
} else {
|
||||
options.socket.startTls(options)
|
||||
return options.socket
|
||||
}
|
||||
}
|
||||
@ -34,6 +34,9 @@
|
||||
"co": "4.6.0",
|
||||
"pg-copy-streams": "0.3.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"pg-cloudflare": "1.x"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"pg-native": ">=3.0.1"
|
||||
},
|
||||
|
||||
@ -6,6 +6,7 @@
|
||||
},
|
||||
"include": [],
|
||||
"references": [
|
||||
{"path": "./packages/pg-cloudflare"},
|
||||
{"path": "./packages/pg-query-stream"},
|
||||
{"path": "./packages/pg-protocol"}
|
||||
]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user