feat: BeforeQuery and AfterQuery events (#10234)

* feat: add query events

Co-authored-by: mnguyen081002 <3langn@gmail.com>
Co-authored-by: andrewda <andrew@dassonville.dev>

* fix: functional cache

* test: added test for issue-3302

* refactor: use broadcasterResult.wait()

* test: use sandbox to restoring

* test: fix test from #3302 with cockroachdb

* docs: add beforeQuery and afterQuery documents

---------

Co-authored-by: andrewda <andrew@dassonville.dev>
Co-authored-by: Dmitry Zotov <dmzt08@gmail.com>
This commit is contained in:
Minh Nguyên 2024-01-02 14:23:25 +07:00 committed by GitHub
parent 0f11739351
commit 5c28154cbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 712 additions and 82 deletions

View File

@ -238,6 +238,20 @@ export class PostSubscriber implements EntitySubscriberInterface {
console.log(`AFTER ENTITY LOADED: `, entity)
}
/**
* Called before query execution.
*/
beforeQuery(event: BeforeQueryEvent<any>) {
console.log(`BEFORE QUERY: `, event.query)
}
/**
* Called after query execution.
*/
afterQuery(event: AfterQueryEvent<any>) {
console.log(`AFTER QUERY: `, event.query)
}
/**
* Called before entity insertion.
*/

View File

@ -25,6 +25,7 @@ import { ReplicationMode } from "../types/ReplicationMode"
import { TypeORMError } from "../../error"
import { MetadataTableType } from "../types/MetadataTableType"
import { InstanceChecker } from "../../util/InstanceChecker"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
import { VersionUtils } from "../../util/VersionUtils"
/**
@ -271,7 +272,15 @@ export class CockroachQueryRunner
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
const databaseConnection = await this.connect()
const broadcasterResult = new BroadcasterResult()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
if (this.isTransactionActive && this.storeQueries) {
@ -323,6 +332,16 @@ export class CockroachQueryRunner
result.raw = raw.rows
}
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
if (useStructuredResult) {
return result
} else {
@ -365,9 +384,19 @@ export class CockroachQueryRunner
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw new QueryFailedError(query, parameters, err)
}
} finally {
await broadcasterResult.wait()
}
}

View File

@ -6,6 +6,7 @@ import { CordovaDriver } from "./CordovaDriver"
import { Broadcaster } from "../../subscriber/Broadcaster"
import { TypeORMError } from "../../error"
import { QueryResult } from "../../query-runner/QueryResult"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single sqlite database connection.
@ -52,7 +53,15 @@ export class CordovaQueryRunner extends AbstractSqliteQueryRunner {
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
const databaseConnection = await this.connect()
const broadcasterResult = new BroadcasterResult()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
try {
@ -70,6 +79,17 @@ export class CordovaQueryRunner extends AbstractSqliteQueryRunner {
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -108,7 +128,19 @@ export class CordovaQueryRunner extends AbstractSqliteQueryRunner {
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw new QueryFailedError(query, parameters, err)
} finally {
await broadcasterResult.wait()
}
}

View File

@ -5,6 +5,7 @@ import { TransactionNotStartedError } from "../../error/TransactionNotStartedErr
import { ExpoDriver } from "./ExpoDriver"
import { Broadcaster } from "../../subscriber/Broadcaster"
import { QueryResult } from "../../query-runner/QueryResult"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
// Needed to satisfy the Typescript compiler
interface IResultSet {
@ -164,7 +165,15 @@ export class ExpoQueryRunner extends AbstractSqliteQueryRunner {
return new Promise<any>(async (ok, fail) => {
const databaseConnection = await this.connect()
const broadcasterResult = new BroadcasterResult()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
// All Expo SQL queries are executed in a transaction context
databaseConnection.transaction(
@ -176,13 +185,25 @@ export class ExpoQueryRunner extends AbstractSqliteQueryRunner {
this.transaction.executeSql(
query,
parameters,
(t: ITransaction, raw: IResultSet) => {
async (t: ITransaction, raw: IResultSet) => {
// log slow queries if maxQueryExecution time is set
const maxQueryExecutionTime =
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime =
queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
await broadcasterResult.wait()
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -222,13 +243,24 @@ export class ExpoQueryRunner extends AbstractSqliteQueryRunner {
ok(result.raw)
}
},
(t: ITransaction, err: any) => {
async (t: ITransaction, err: any) => {
this.driver.connection.logger.logQueryError(
err,
query,
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
await broadcasterResult.wait()
fail(new QueryFailedError(query, parameters, err))
},
)

View File

@ -26,6 +26,7 @@ import { ReplicationMode } from "../types/ReplicationMode"
import { TypeORMError } from "../../error"
import { MetadataTableType } from "../types/MetadataTableType"
import { InstanceChecker } from "../../util/InstanceChecker"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single mysql database connection.
@ -190,19 +191,29 @@ export class MysqlQueryRunner extends BaseQueryRunner implements QueryRunner {
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
return new Promise(async (ok, fail) => {
const broadcasterResult = new BroadcasterResult()
try {
const databaseConnection = await this.connect()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
databaseConnection.query(
query,
parameters,
(err: any, raw: any) => {
async (err: any, raw: any) => {
// log slow queries if maxQueryExecution time is set
const maxQueryExecutionTime =
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -221,11 +232,31 @@ export class MysqlQueryRunner extends BaseQueryRunner implements QueryRunner {
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
return fail(
new QueryFailedError(query, parameters, err),
)
}
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
const result = new QueryResult()
result.raw = raw
@ -249,6 +280,8 @@ export class MysqlQueryRunner extends BaseQueryRunner implements QueryRunner {
)
} catch (err) {
fail(err)
} finally {
await broadcasterResult.wait()
}
})
}

View File

@ -24,6 +24,7 @@ import { TypeORMError } from "../../error"
import { QueryResult } from "../../query-runner/QueryResult"
import { MetadataTableType } from "../types/MetadataTableType"
import { InstanceChecker } from "../../util/InstanceChecker"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single oracle database connection.
@ -199,8 +200,15 @@ export class OracleQueryRunner extends BaseQueryRunner implements QueryRunner {
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
const databaseConnection = await this.connect()
const broadcasterResult = new BroadcasterResult()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
try {
@ -220,6 +228,17 @@ export class OracleQueryRunner extends BaseQueryRunner implements QueryRunner {
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -273,7 +292,19 @@ export class OracleQueryRunner extends BaseQueryRunner implements QueryRunner {
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw new QueryFailedError(query, parameters, err)
} finally {
await broadcasterResult.wait()
}
}

View File

@ -26,6 +26,7 @@ import { IsolationLevel } from "../types/IsolationLevel"
import { MetadataTableType } from "../types/MetadataTableType"
import { ReplicationMode } from "../types/ReplicationMode"
import { PostgresDriver } from "./PostgresDriver"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single postgres database connection.
@ -245,8 +246,15 @@ export class PostgresQueryRunner
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
const databaseConnection = await this.connect()
const broadcasterResult = new BroadcasterResult()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
try {
const queryStartTime = +new Date()
const raw = await databaseConnection.query(query, parameters)
@ -255,6 +263,17 @@ export class PostgresQueryRunner
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -299,7 +318,19 @@ export class PostgresQueryRunner
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw new QueryFailedError(query, parameters, err)
} finally {
await broadcasterResult.wait()
}
}

View File

@ -5,6 +5,7 @@ import { AbstractSqliteQueryRunner } from "../sqlite-abstract/AbstractSqliteQuer
import { ReactNativeDriver } from "./ReactNativeDriver"
import { Broadcaster } from "../../subscriber/Broadcaster"
import { QueryResult } from "../../query-runner/QueryResult"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single sqlite database connection.
@ -53,17 +54,36 @@ export class ReactNativeQueryRunner extends AbstractSqliteQueryRunner {
return new Promise(async (ok, fail) => {
const databaseConnection = await this.connect()
const broadcasterResult = new BroadcasterResult()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
databaseConnection.executeSql(
query,
parameters,
(raw: any) => {
async (raw: any) => {
// log slow queries if maxQueryExecution time is set
const maxQueryExecutionTime =
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -75,6 +95,9 @@ export class ReactNativeQueryRunner extends AbstractSqliteQueryRunner {
this,
)
if (broadcasterResult.promises.length > 0)
await Promise.all(broadcasterResult.promises)
const result = new QueryResult()
if (raw?.hasOwnProperty("rowsAffected")) {
@ -102,13 +125,24 @@ export class ReactNativeQueryRunner extends AbstractSqliteQueryRunner {
ok(result.raw)
}
},
(err: any) => {
async (err: any) => {
this.driver.connection.logger.logQueryError(
err,
query,
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
await broadcasterResult.wait()
fail(new QueryFailedError(query, parameters, err))
},
)

View File

@ -27,6 +27,7 @@ import { QueryLock } from "../../query-runner/QueryLock"
import { MetadataTableType } from "../types/MetadataTableType"
import { InstanceChecker } from "../../util/InstanceChecker"
import { promisify } from "util"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single SQL Server database connection.
@ -194,11 +195,18 @@ export class SapQueryRunner extends BaseQueryRunner implements QueryRunner {
let statement: any
const result = new QueryResult()
const broadcasterResult = new BroadcasterResult()
try {
const databaseConnection = await this.connect()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
const isInsertQuery = query.substr(0, 11) === "INSERT INTO"
@ -226,6 +234,17 @@ export class SapQueryRunner extends BaseQueryRunner implements QueryRunner {
this.driver.connection.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -270,20 +289,31 @@ export class SapQueryRunner extends BaseQueryRunner implements QueryRunner {
result.raw = identityValueResult[0]["CURRENT_IDENTITY_VALUE()"]
result.records = identityValueResult
}
} catch (e) {
} catch (err) {
this.driver.connection.logger.logQueryError(
e,
err,
query,
parameters,
this,
)
throw e
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw err
} finally {
// Never forget to drop the statement we reserved
if (statement?.drop) {
await new Promise<void>((ok) => statement.drop(() => ok()))
}
await broadcasterResult.wait()
// Always release the lock.
release()
}

View File

@ -24,6 +24,7 @@ import { TypeORMError } from "../../error"
import { QueryResult } from "../../query-runner/QueryResult"
import { MetadataTableType } from "../types/MetadataTableType"
import { SpannerDriver } from "./SpannerDriver"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single postgres database connection.
@ -155,6 +156,8 @@ export class SpannerQueryRunner extends BaseQueryRunner implements QueryRunner {
): Promise<any> {
if (this.isReleased) throw new QueryRunnerAlreadyReleasedError()
const broadcasterResult = new BroadcasterResult()
try {
const queryStartTime = +new Date()
await this.connect()
@ -182,6 +185,12 @@ export class SpannerQueryRunner extends BaseQueryRunner implements QueryRunner {
try {
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
rawResult = await executor.run({
sql: query,
params: parameters
@ -209,6 +218,17 @@ export class SpannerQueryRunner extends BaseQueryRunner implements QueryRunner {
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
rawResult,
undefined,
)
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -240,8 +260,18 @@ export class SpannerQueryRunner extends BaseQueryRunner implements QueryRunner {
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw new QueryFailedError(query, parameters, err)
} finally {
await broadcasterResult.wait()
}
}

View File

@ -35,7 +35,6 @@ export class SqliteDriver extends AbstractSqliteDriver {
constructor(connection: DataSource) {
super(connection)
this.connection = connection
this.options = connection.options as SqliteConnectionOptions
this.database = this.options.database

View File

@ -6,6 +6,7 @@ import { SqliteDriver } from "./SqliteDriver"
import { Broadcaster } from "../../subscriber/Broadcaster"
import { ConnectionIsNotSetError } from "../../error/ConnectionIsNotSetError"
import { QueryResult } from "../../query-runner/QueryResult"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single sqlite database connection.
@ -57,84 +58,121 @@ export class SqliteQueryRunner extends AbstractSqliteQueryRunner {
const connection = this.driver.connection
const options = connection.options as SqliteConnectionOptions
const maxQueryExecutionTime = this.driver.options.maxQueryExecutionTime
const broadcasterResult = new BroadcasterResult()
const broadcaster = this.broadcaster
broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
if (!connection.isInitialized) {
throw new ConnectionIsNotSetError("sqlite")
}
return new Promise(async (ok, fail) => {
const databaseConnection = await this.connect()
this.driver.connection.logger.logQuery(query, parameters, this)
const queryStartTime = +new Date()
const isInsertQuery = query.startsWith("INSERT ")
const isDeleteQuery = query.startsWith("DELETE ")
const isUpdateQuery = query.startsWith("UPDATE ")
try {
const databaseConnection = await this.connect()
this.driver.connection.logger.logQuery(query, parameters, this)
const queryStartTime = +new Date()
const isInsertQuery = query.startsWith("INSERT ")
const isDeleteQuery = query.startsWith("DELETE ")
const isUpdateQuery = query.startsWith("UPDATE ")
const execute = async () => {
if (isInsertQuery || isDeleteQuery || isUpdateQuery) {
await databaseConnection.run(query, parameters, handler)
} else {
await databaseConnection.all(query, parameters, handler)
const execute = async () => {
if (isInsertQuery || isDeleteQuery || isUpdateQuery) {
await databaseConnection.run(query, parameters, handler)
} else {
await databaseConnection.all(query, parameters, handler)
}
}
}
const handler = function (err: any, rows: any) {
if (err && err.toString().indexOf("SQLITE_BUSY:") !== -1) {
const handler = function (err: any, rows: any) {
if (err && err.toString().indexOf("SQLITE_BUSY:") !== -1) {
if (
typeof options.busyErrorRetry === "number" &&
options.busyErrorRetry > 0
) {
setTimeout(execute, options.busyErrorRetry)
return
}
}
// log slow queries if maxQueryExecution time is set
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
if (
typeof options.busyErrorRetry === "number" &&
options.busyErrorRetry > 0
) {
setTimeout(execute, options.busyErrorRetry)
return
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
)
connection.logger.logQuerySlow(
queryExecutionTime,
query,
parameters,
this,
)
if (err) {
connection.logger.logQueryError(
err,
query,
parameters,
this,
)
broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
return fail(
new QueryFailedError(query, parameters, err),
)
} else {
const result = new QueryResult()
if (isInsertQuery) {
result.raw = this["lastID"]
} else {
result.raw = rows
}
broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
result.raw,
undefined,
)
if (Array.isArray(rows)) {
result.records = rows
}
result.affected = this["changes"]
if (useStructuredResult) {
ok(result)
} else {
ok(result.raw)
}
}
}
// log slow queries if maxQueryExecution time is set
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
)
connection.logger.logQuerySlow(
queryExecutionTime,
query,
parameters,
this,
)
if (err) {
connection.logger.logQueryError(
err,
query,
parameters,
this,
)
fail(new QueryFailedError(query, parameters, err))
} else {
const result = new QueryResult()
if (isInsertQuery) {
result.raw = this["lastID"]
} else {
result.raw = rows
}
if (Array.isArray(rows)) {
result.records = rows
}
result.affected = this["changes"]
if (useStructuredResult) {
ok(result)
} else {
ok(result.raw)
}
}
await execute()
} catch (err) {
fail(err)
} finally {
await broadcasterResult.wait()
console.log("Finally", query, broadcasterResult.count)
}
await execute()
})
}
}

View File

@ -4,6 +4,7 @@ import { SqljsDriver } from "./SqljsDriver"
import { Broadcaster } from "../../subscriber/Broadcaster"
import { QueryFailedError } from "../../error/QueryFailedError"
import { QueryResult } from "../../query-runner/QueryResult"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single sqlite database connection.
@ -84,7 +85,15 @@ export class SqljsQueryRunner extends AbstractSqliteQueryRunner {
const command = query.trim().split(" ", 1)[0]
const databaseConnection = this.driver.databaseConnection
const broadcasterResult = new BroadcasterResult()
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const queryStartTime = +new Date()
let statement: any
try {
@ -102,6 +111,7 @@ export class SqljsQueryRunner extends AbstractSqliteQueryRunner {
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -119,6 +129,16 @@ export class SqljsQueryRunner extends AbstractSqliteQueryRunner {
records.push(statement.getAsObject())
}
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
records,
undefined,
)
const result = new QueryResult()
result.affected = databaseConnection.getRowsModified()
@ -136,18 +156,30 @@ export class SqljsQueryRunner extends AbstractSqliteQueryRunner {
} else {
return result.raw
}
} catch (e) {
} catch (err) {
if (statement) {
statement.free()
}
this.driver.connection.logger.logQueryError(
e,
err,
query,
parameters,
this,
)
throw new QueryFailedError(query, parameters, e)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw new QueryFailedError(query, parameters, err)
} finally {
await broadcasterResult.wait()
}
}
}

View File

@ -27,6 +27,7 @@ import { TypeORMError } from "../../error"
import { QueryLock } from "../../query-runner/QueryLock"
import { MetadataTableType } from "../types/MetadataTableType"
import { InstanceChecker } from "../../util/InstanceChecker"
import { BroadcasterResult } from "../../subscriber/BroadcasterResult"
/**
* Runs queries on a single SQL Server database connection.
@ -209,8 +210,16 @@ export class SqlServerQueryRunner
const release = await this.lock.acquire()
const broadcasterResult = new BroadcasterResult()
try {
this.driver.connection.logger.logQuery(query, parameters, this)
this.broadcaster.broadcastBeforeQueryEvent(
broadcasterResult,
query,
parameters,
)
const pool = await (this.mode === "slave"
? this.driver.obtainSlaveConnection()
: this.driver.obtainMasterConnection())
@ -246,6 +255,17 @@ export class SqlServerQueryRunner
this.driver.options.maxQueryExecutionTime
const queryEndTime = +new Date()
const queryExecutionTime = queryEndTime - queryStartTime
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
true,
queryExecutionTime,
raw,
undefined,
)
if (
maxQueryExecutionTime &&
queryExecutionTime > maxQueryExecutionTime
@ -298,8 +318,20 @@ export class SqlServerQueryRunner
parameters,
this,
)
this.broadcaster.broadcastAfterQueryEvent(
broadcasterResult,
query,
parameters,
false,
undefined,
undefined,
err,
)
throw err
} finally {
await broadcasterResult.wait()
release()
}
}

View File

@ -458,7 +458,7 @@ export class EntityMetadata {
afterLoadListeners: EntityListenerMetadata[] = []
/**
* Listener metadatas with "AFTER INSERT" type.
* Listener metadatas with "BEFORE INSERT" type.
*/
beforeInsertListeners: EntityListenerMetadata[] = []
@ -468,7 +468,7 @@ export class EntityMetadata {
afterInsertListeners: EntityListenerMetadata[] = []
/**
* Listener metadatas with "AFTER UPDATE" type.
* Listener metadatas with "BEFORE UPDATE" type.
*/
beforeUpdateListeners: EntityListenerMetadata[] = []
@ -478,7 +478,7 @@ export class EntityMetadata {
afterUpdateListeners: EntityListenerMetadata[] = []
/**
* Listener metadatas with "AFTER REMOVE" type.
* Listener metadatas with "BEFORE REMOVE" type.
*/
beforeRemoveListeners: EntityListenerMetadata[] = []

View File

@ -8,6 +8,9 @@ import { RelationMetadata } from "../metadata/RelationMetadata"
import { ObjectUtils } from "../util/ObjectUtils"
interface BroadcasterEvents {
BeforeQuery: () => void
AfterQuery: () => void
BeforeTransactionCommit: () => void
AfterTransactionCommit: () => void
BeforeTransactionStart: () => void
@ -401,6 +404,66 @@ export class Broadcaster {
}
}
/**
* Broadcasts "BEFORE_QUERY" event.
*/
broadcastBeforeQueryEvent(
result: BroadcasterResult,
query: string,
parameters: undefined | any[],
): void {
if (this.queryRunner.connection.subscribers.length) {
this.queryRunner.connection.subscribers.forEach((subscriber) => {
if (subscriber.beforeQuery) {
const executionResult = subscriber.beforeQuery({
connection: this.queryRunner.connection,
queryRunner: this.queryRunner,
manager: this.queryRunner.manager,
query: query,
parameters: parameters,
})
if (executionResult instanceof Promise)
result.promises.push(executionResult)
result.count++
}
})
}
}
/**
* Broadcasts "AFTER_QUERY" event.
*/
broadcastAfterQueryEvent(
result: BroadcasterResult,
query: string,
parameters: undefined | any[],
success: boolean,
executionTime: undefined | number,
rawResults: undefined | any,
error: undefined | any,
): void {
if (this.queryRunner.connection.subscribers.length) {
this.queryRunner.connection.subscribers.forEach((subscriber) => {
if (subscriber.afterQuery) {
const executionResult = subscriber.afterQuery({
connection: this.queryRunner.connection,
queryRunner: this.queryRunner,
manager: this.queryRunner.manager,
query: query,
parameters: parameters,
success: success,
executionTime: executionTime,
rawResults: rawResults,
error: error,
})
if (executionResult instanceof Promise)
result.promises.push(executionResult)
result.count++
}
})
}
}
/**
* Broadcasts "BEFORE_TRANSACTION_START" event.
*/

View File

@ -7,6 +7,7 @@ import { InsertEvent } from "./event/InsertEvent"
import { LoadEvent } from "./event/LoadEvent"
import { SoftRemoveEvent } from "./event/SoftRemoveEvent"
import { RecoverEvent } from "./event/RecoverEvent"
import { AfterQueryEvent, BeforeQueryEvent } from "./event/QueryEvent"
/**
* Classes that implement this interface are subscribers that subscribe for the specific events in the ORM.
@ -28,6 +29,16 @@ export interface EntitySubscriberInterface<Entity = any> {
*/
afterLoad?(entity: Entity, event?: LoadEvent<Entity>): Promise<any> | void
/**
* Called before query is executed.
*/
beforeQuery?(event: BeforeQueryEvent<Entity>): Promise<any> | void
/**
* Called after query is executed.
*/
afterQuery?(event: AfterQueryEvent<Entity>): Promise<any> | void
/**
* Called before entity is inserted to the database.
*/

View File

@ -0,0 +1,59 @@
import { EntityManager } from "../../entity-manager/EntityManager"
import { DataSource } from "../../data-source/DataSource"
import { QueryRunner } from "../../query-runner/QueryRunner"
/**
* BeforeQueryEvent is an object that broadcaster sends to the entity subscriber before query is ran against the database.
*/
export interface QueryEvent<Entity> {
/**
* Connection used in the event.
*/
connection: DataSource
/**
* QueryRunner used in the event transaction.
* All database operations in the subscribed event listener should be performed using this query runner instance.
*/
queryRunner: QueryRunner
/**
* EntityManager used in the event transaction.
* All database operations in the subscribed event listener should be performed using this entity manager instance.
*/
manager: EntityManager
/**
* Query that is being executed.
*/
query: string
/**
* Parameters used in the query.
*/
parameters?: any[]
}
export interface BeforeQueryEvent<Entity> extends QueryEvent<Entity> {}
export interface AfterQueryEvent<Entity> extends QueryEvent<Entity> {
/**
* Whether the query was successful.
*/
success: boolean
/**
* The duration of the query execution.
*/
executionTime?: number
/**
* The raw results from the database if the query was successful.
*/
rawResults?: any
/**
* The error thrown if the query was unsuccessful.
*/
error?: any
}

View File

@ -14,6 +14,6 @@ export class Address {
@Column()
address: string
@ManyToOne(() => User)
@ManyToOne(() => User, (u) => u.addresses)
user: User
}

View File

@ -0,0 +1,12 @@
import { Column } from "../../../../src/decorator/columns/Column"
import { PrimaryColumn } from "../../../../src/decorator/columns/PrimaryColumn"
import { Entity } from "../../../../src/decorator/entity/Entity"
@Entity()
export class Post {
@PrimaryColumn()
id: number
@Column()
title: string
}

View File

@ -0,0 +1,55 @@
import "reflect-metadata"
import appRootPath from "app-root-path"
import sinon from "sinon"
import { DataSource } from "../../../src"
import {
createTestingConnections,
reloadTestingDatabases,
closeTestingConnections,
} from "../../utils/test-utils"
import { PlatformTools } from "../../../src/platform/PlatformTools"
describe("github issues > #3302 Tracking query time for slow queries and statsd timers", () => {
let connections: DataSource[]
let stub: sinon.SinonStub
let sandbox: sinon.SinonSandbox
const beforeQueryLogPath = appRootPath + "/before-query.log"
const afterQueryLogPath = appRootPath + "/after-query.log"
before(async () => {
connections = await createTestingConnections({
entities: [__dirname + "/entity/*{.js,.ts}"],
subscribers: [__dirname + "/subscriber/*{.js,.ts}"],
})
sandbox = sinon.createSandbox()
stub = sandbox.stub(PlatformTools, "appendFileSync")
})
beforeEach(() => reloadTestingDatabases(connections))
afterEach(async () => {
stub.resetHistory()
sandbox.restore()
await closeTestingConnections(connections)
})
it("if query executed, should write query to file", async () =>
Promise.all(
connections.map(async (connection) => {
const testQuery = `SELECT COUNT(*) FROM ${connection.driver.escape(
"post",
)}`
await connection.query(testQuery)
sinon.assert.calledWith(
stub,
beforeQueryLogPath,
sinon.match(testQuery),
)
sinon.assert.calledWith(
stub,
afterQueryLogPath,
sinon.match(testQuery),
)
}),
))
})

View File

@ -0,0 +1,28 @@
import { Post } from "../entity/Post"
import { EntitySubscriberInterface, EventSubscriber } from "../../../../src"
import {
AfterQueryEvent,
BeforeQueryEvent,
} from "../../../../src/subscriber/event/QueryEvent"
import { PlatformTools } from "../../../../src/platform/PlatformTools"
import appRootPath from "app-root-path"
@EventSubscriber()
export class PostSubscriber implements EntitySubscriberInterface<Post> {
listenTo() {
return Post
}
beforeQuery(event: BeforeQueryEvent<Post>): void | Promise<any> {
PlatformTools.appendFileSync(
appRootPath.path + "/before-query.log",
event.query,
)
}
afterQuery(event: AfterQueryEvent<Post>): void | Promise<any> {
PlatformTools.appendFileSync(
appRootPath.path + "/after-query.log",
event.query,
)
}
}

View File

@ -13,6 +13,7 @@ import { PlatformTools } from "../../../src/platform/PlatformTools"
describe("github issues > #4410 allow custom filepath for FileLogger", () => {
let connections: DataSource[]
let stub: sinon.SinonStub
let sandbox: sinon.SinonSandbox
const testingOptions: TestingOptions = {
entities: [Username],
@ -20,10 +21,14 @@ describe("github issues > #4410 allow custom filepath for FileLogger", () => {
dropSchema: true,
}
before(() => (stub = sinon.stub(PlatformTools, "appendFileSync")))
before(() => {
sandbox = sinon.createSandbox()
stub = sandbox.stub(PlatformTools, "appendFileSync")
})
beforeEach(() => reloadTestingDatabases(connections))
afterEach(async () => {
stub.resetHistory()
await closeTestingConnections(connections)
})