mirror of
https://github.com/typeorm/typeorm.git
synced 2025-12-08 21:26:23 +00:00
fix: cleanup after streaming in sap hana (#11399)
This commit is contained in:
parent
b9842e3be9
commit
fadad1a74c
@ -208,20 +208,25 @@ export class SapQueryRunner extends BaseQueryRunner implements QueryRunner {
|
||||
const isInsertQuery = query.substr(0, 11) === "INSERT INTO"
|
||||
|
||||
if (parameters?.some(Array.isArray)) {
|
||||
statement = await promisify(
|
||||
databaseConnection.prepare.bind(databaseConnection),
|
||||
)(query)
|
||||
statement = await promisify(databaseConnection.prepare).call(
|
||||
databaseConnection,
|
||||
query,
|
||||
)
|
||||
}
|
||||
|
||||
let raw: any
|
||||
try {
|
||||
raw = statement
|
||||
? await promisify(statement.exec.bind(statement))(
|
||||
? await promisify(statement.exec).call(
|
||||
statement,
|
||||
parameters,
|
||||
)
|
||||
: await promisify(
|
||||
databaseConnection.exec.bind(databaseConnection),
|
||||
)(query, parameters, {})
|
||||
: await promisify(databaseConnection.exec).call(
|
||||
databaseConnection,
|
||||
query,
|
||||
parameters,
|
||||
{},
|
||||
)
|
||||
} catch (err) {
|
||||
throw new QueryFailedError(query, parameters, err)
|
||||
}
|
||||
@ -333,20 +338,61 @@ export class SapQueryRunner extends BaseQueryRunner implements QueryRunner {
|
||||
): Promise<ReadStream> {
|
||||
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
|
||||
|
||||
const databaseConnection = await this.connect()
|
||||
this.driver.connection.logger.logQuery(query, parameters, this)
|
||||
const release = await this.lock.acquire()
|
||||
let statement: any
|
||||
let resultSet: any
|
||||
|
||||
const prepareAsync = promisify(databaseConnection.prepare).bind(
|
||||
databaseConnection,
|
||||
)
|
||||
const statement = await prepareAsync(query)
|
||||
const resultSet = statement.executeQuery(parameters)
|
||||
const stream = this.driver.streamClient.createObjectStream(resultSet)
|
||||
const cleanup = async () => {
|
||||
if (resultSet) {
|
||||
await promisify(resultSet.close).call(resultSet)
|
||||
}
|
||||
if (statement) {
|
||||
await promisify(statement.drop).call(statement)
|
||||
}
|
||||
release()
|
||||
}
|
||||
|
||||
if (onEnd) stream.on("end", onEnd)
|
||||
if (onError) stream.on("error", onError)
|
||||
try {
|
||||
const databaseConnection = await this.connect()
|
||||
this.driver.connection.logger.logQuery(query, parameters, this)
|
||||
|
||||
return stream
|
||||
statement = await promisify(databaseConnection.prepare).call(
|
||||
databaseConnection,
|
||||
query,
|
||||
)
|
||||
resultSet = await promisify(statement.executeQuery).call(
|
||||
statement,
|
||||
parameters,
|
||||
)
|
||||
|
||||
const stream =
|
||||
this.driver.streamClient.createObjectStream(resultSet)
|
||||
stream.on("end", async () => {
|
||||
await cleanup()
|
||||
onEnd?.()
|
||||
})
|
||||
stream.on("error", async (error: Error) => {
|
||||
this.driver.connection.logger.logQueryError(
|
||||
error,
|
||||
query,
|
||||
parameters,
|
||||
this,
|
||||
)
|
||||
await cleanup()
|
||||
onError?.(error)
|
||||
})
|
||||
|
||||
return stream
|
||||
} catch (error) {
|
||||
this.driver.connection.logger.logQueryError(
|
||||
error,
|
||||
query,
|
||||
parameters,
|
||||
this,
|
||||
)
|
||||
await cleanup()
|
||||
throw new QueryFailedError(query, parameters, error)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -52,7 +52,10 @@ describe("query runner > stream", () => {
|
||||
|
||||
readStream.on("data", (row) => data.push(row))
|
||||
|
||||
await new Promise((ok) => readStream.once("end", ok))
|
||||
await new Promise((ok, fail) => {
|
||||
readStream.once("end", ok)
|
||||
readStream.once("error", fail)
|
||||
})
|
||||
|
||||
expect(data).to.have.length(4)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user