added stream method to query builder and drivers

This commit is contained in:
Umed Khudoiberdiev 2017-06-21 18:41:11 +05:00
parent a0dea7ec45
commit 92411f8039
10 changed files with 175 additions and 18 deletions

View File

@ -38,6 +38,7 @@ import {
} from "./typings";
import {Connection} from "../../connection/Connection";
import {EntityManager} from "../../entity-manager/EntityManager";
import {ReadStream} from "fs";
/**
* Runs queries on a single MongoDB connection.
@ -384,6 +385,13 @@ export class MongoQueryRunner implements QueryRunner {
throw new Error(`Executing SQL query is not supported by MongoDB driver.`);
}
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
throw new Error(`Stream is not supported by MongoDB driver.`);
}
/**
* Insert a new row with given values into the given table.
* Returns value of inserted object id.

View File

@ -12,6 +12,7 @@ import {QueryRunnerAlreadyReleasedError} from "../../query-runner/error/QueryRun
import {MysqlDriver} from "./MysqlDriver";
import {Connection} from "../../connection/Connection";
import {EntityManager} from "../../entity-manager/EntityManager";
import {ReadStream} from "fs";
/**
* Runs queries on a single mysql database connection.
@ -155,8 +156,8 @@ export class MysqlQueryRunner implements QueryRunner {
throw new QueryRunnerAlreadyReleasedError();
return new Promise(async (ok, fail) => {
this.driver.connection.logger.logQuery(query, parameters, this);
const databaseConnection = await this.connect();
this.driver.connection.logger.logQuery(query, parameters, this);
databaseConnection.query(query, parameters, (err: any, result: any) => {
if (err) {
this.driver.connection.logger.logFailedQuery(query, parameters, this);
@ -169,6 +170,23 @@ export class MysqlQueryRunner implements QueryRunner {
});
}
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();
return new Promise(async (ok, fail) => {
const databaseConnection = await this.connect();
this.driver.connection.logger.logQuery(query, parameters, this);
const stream = databaseConnection.query(query, parameters);
if (onEnd) stream.on("end", onEnd);
if (onError) stream.on("error", onError);
ok(stream);
});
}
/**
* Insert a new row with given values into the given table.
* Returns value of the generated column if given and generate column exist in the table.

View File

@ -12,6 +12,7 @@ import {QueryRunnerAlreadyReleasedError} from "../../query-runner/error/QueryRun
import {OracleDriver} from "./OracleDriver";
import {EntityManager} from "../../entity-manager/EntityManager";
import {Connection} from "../../connection/Connection";
import {ReadStream} from "fs";
/**
* Runs queries on a single oracle database connection.
@ -187,6 +188,13 @@ export class OracleQueryRunner implements QueryRunner {
});
}
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
throw new Error(`Stream is not supported by Oracle driver.`);
}
/**
* Insert a new row with given values into the given table.
* Returns value of the generated column if given and generate column exist in the table.

View File

@ -143,7 +143,7 @@ export class PostgresDriver implements Driver {
}
// -------------------------------------------------------------------------
// Public Methods
// Public Implemented Methods
// -------------------------------------------------------------------------
/**
@ -377,6 +377,22 @@ export class PostgresDriver implements Driver {
}
}
// -------------------------------------------------------------------------
// Public Methods
// -------------------------------------------------------------------------
/**
* Loads postgres query stream package.
*/
loadStreamDependency() {
try {
return PlatformTools.load("pg-query-stream");
} catch (e) { // todo: better error for browser env
throw new Error(`To use streams you should install pg-query-stream package. Please run npm i pg-query-stream --save command.`);
}
}
// -------------------------------------------------------------------------
// Protected Methods
// -------------------------------------------------------------------------

View File

@ -12,6 +12,7 @@ import {QueryRunnerAlreadyReleasedError} from "../../query-runner/error/QueryRun
import {PostgresDriver} from "./PostgresDriver";
import {EntityManager} from "../../entity-manager/EntityManager";
import {Connection} from "../../connection/Connection";
import {ReadStream} from "fs";
/**
* Runs queries on a single postgres database connection.
@ -180,8 +181,8 @@ export class PostgresQueryRunner implements QueryRunner {
// console.log("query: ", query);
// console.log("parameters: ", parameters);
return new Promise<any[]>(async (ok, fail) => {
this.driver.connection.logger.logQuery(query, parameters, this);
const databaseConnection = await this.connect();
this.driver.connection.logger.logQuery(query, parameters, this);
databaseConnection.query(query, parameters, (err: any, result: any) => {
if (err) {
this.driver.connection.logger.logFailedQuery(query, parameters, this);
@ -194,6 +195,24 @@ export class PostgresQueryRunner implements QueryRunner {
});
}
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
const QueryStream = this.driver.loadStreamDependency();
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();
return new Promise(async (ok, fail) => {
const databaseConnection = await this.connect();
this.driver.connection.logger.logQuery(query, parameters, this);
const stream = databaseConnection.query(new QueryStream(query, parameters));
if (onEnd) stream.on("end", onEnd);
if (onError) stream.on("error", onError);
ok(stream);
});
}
/**
* Insert a new row with given values into the given table.
* Returns value of the generated column if given and generate column exist in the table.

View File

@ -13,6 +13,8 @@ import {RandomGenerator} from "../../util/RandomGenerator";
import {SqliteDriver} from "./SqliteDriver";
import {EntityManager} from "../../entity-manager/EntityManager";
import {Connection} from "../../connection/Connection";
import {ReadStream} from "fs";
import {Readable} from "stream";
/**
* Runs queries on a single sqlite database connection.
@ -133,8 +135,8 @@ export class SqliteQueryRunner implements QueryRunner {
throw new QueryRunnerAlreadyReleasedError();
return new Promise<any[]>(async (ok, fail) => {
this.driver.connection.logger.logQuery(query, parameters, this);
const databaseConnection = await this.connect();
this.driver.connection.logger.logQuery(query, parameters, this);
databaseConnection.all(query, parameters, (err: any, result: any) => {
if (err) {
this.driver.connection.logger.logFailedQuery(query, parameters, this);
@ -147,6 +149,13 @@ export class SqliteQueryRunner implements QueryRunner {
});
}
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
throw new Error(`Stream is not supported by sqlite driver.`);
}
/**
* Insert a new row with given values into the given table.
* Returns value of the generated column if given and generate column exist in the table.

View File

@ -12,6 +12,7 @@ import {QueryRunnerAlreadyReleasedError} from "../../query-runner/error/QueryRun
import {SqlServerDriver} from "./SqlServerDriver";
import {EntityManager} from "../../entity-manager/EntityManager";
import {Connection} from "../../connection/Connection";
import {ReadStream} from "fs";
/**
* Runs queries on a single mysql database connection.
@ -219,6 +220,63 @@ export class SqlServerQueryRunner implements QueryRunner {
return promise;
}
/**
* Returns raw data stream.
*/
async stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();
let waitingOkay: Function;
const waitingPromise = new Promise((ok) => waitingOkay = ok);
if (this.queryResponsibilityChain.length) {
const otherWaitingPromises = [...this.queryResponsibilityChain];
this.queryResponsibilityChain.push(waitingPromise);
await Promise.all(otherWaitingPromises);
}
const promise = new Promise<ReadStream>(async (ok, fail) => {
this.driver.connection.logger.logQuery(query, parameters, this);
const request = new this.driver.mssql.Request(this.isTransactionActive ? this.databaseConnection : this.driver.connectionPool);
request.stream = true;
if (parameters && parameters.length) {
parameters.forEach((parameter, index) => {
request.input(index, parameters![index]);
});
}
request.query(query, (err: any, result: any) => {
const resolveChain = () => {
if (promiseIndex !== -1)
this.queryResponsibilityChain.splice(promiseIndex, 1);
if (waitingPromiseIndex !== -1)
this.queryResponsibilityChain.splice(waitingPromiseIndex, 1);
waitingOkay();
};
let promiseIndex = this.queryResponsibilityChain.indexOf(promise);
let waitingPromiseIndex = this.queryResponsibilityChain.indexOf(waitingPromise);
if (err) {
this.driver.connection.logger.logFailedQuery(query, parameters, this);
this.driver.connection.logger.logQueryError((err.originalError && err.originalError.info) ? err.originalError.info.message : err, this);
resolveChain();
return fail(err);
}
ok(result.recordset);
resolveChain();
});
if (onEnd) request.on("done", onEnd);
if (onError) request.on("error", onError);
ok(request as ReadStream);
});
if (this.isTransactionActive)
this.queryResponsibilityChain.push(promise);
return promise;
}
/**
* Insert a new row with given values into the given table.
* Returns value of the generated column if given and generate column exist in the table.

View File

@ -11,6 +11,7 @@ import {QueryRunnerAlreadyReleasedError} from "../../query-runner/error/QueryRun
import {WebsqlDriver} from "./WebsqlDriver";
import {Connection} from "../../connection/Connection";
import {EntityManager} from "../../entity-manager/EntityManager";
import {ReadStream} from "fs";
/**
* Declare a global function that is only available in browsers that support WebSQL.
@ -195,6 +196,13 @@ export class WebsqlQueryRunner implements QueryRunner {
});
}
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
throw new Error(`Stream is not supported by websqlite driver.`);
}
/**
* Insert a new row with given values into the given table.
* Returns value of the generated column if given and generate column exist in the table.

View File

@ -1,36 +1,23 @@
import {RawSqlResultsToEntityTransformer} from "./transformer/RawSqlResultsToEntityTransformer";
import {EntityMetadata} from "../metadata/EntityMetadata";
import {ObjectLiteral} from "../common/ObjectLiteral";
import {QueryRunner} from "../query-runner/QueryRunner";
import {SqlServerDriver} from "../driver/sqlserver/SqlServerDriver";
import {Connection} from "../connection/Connection";
import {JoinOptions} from "./JoinOptions";
import {PessimisticLockTransactionRequiredError} from "./error/PessimisticLockTransactionRequiredError";
import {NoVersionOrUpdateDateColumnError} from "./error/NoVersionOrUpdateDateColumnError";
import {OptimisticLockVersionMismatchError} from "./error/OptimisticLockVersionMismatchError";
import {OptimisticLockCanNotBeUsedError} from "./error/OptimisticLockCanNotBeUsedError";
import {PostgresDriver} from "../driver/postgres/PostgresDriver";
import {MysqlDriver} from "../driver/mysql/MysqlDriver";
import {LockNotSupportedOnGivenDriverError} from "./error/LockNotSupportedOnGivenDriverError";
import {ColumnMetadata} from "../metadata/ColumnMetadata";
import {JoinAttribute} from "./JoinAttribute";
import {RelationIdAttribute} from "./relation-id/RelationIdAttribute";
import {RelationCountAttribute} from "./relation-count/RelationCountAttribute";
import {QueryExpressionMap} from "./QueryExpressionMap";
import {SelectQuery} from "./SelectQuery";
import {RelationIdLoader} from "./relation-id/RelationIdLoader";
import {RelationIdLoadResult} from "./relation-id/RelationIdLoadResult";
import {RelationIdMetadataToAttributeTransformer} from "./relation-id/RelationIdMetadataToAttributeTransformer";
import {RelationCountLoadResult} from "./relation-count/RelationCountLoadResult";
import {RelationCountLoader} from "./relation-count/RelationCountLoader";
import {RelationCountMetadataToAttributeTransformer} from "./relation-count/RelationCountMetadataToAttributeTransformer";
import {OracleDriver} from "../driver/oracle/OracleDriver";
import {Broadcaster} from "../subscriber/Broadcaster";
import {UpdateQueryBuilder} from "./UpdateQueryBuilder";
import {DeleteQueryBuilder} from "./DeleteQueryBuilder";
import {InsertQueryBuilder} from "./InsertQueryBuilder";
import {QueryBuilder} from "./QueryBuilder";
import {ReadStream} from "fs";
/**
* Allows to build complex sql queries in a fashion way and execute those queries.
@ -711,6 +698,26 @@ export class SelectQueryBuilder<Entity> extends QueryBuilder<Entity> {
return result;
}
/**
* Returns raw data stream.
*/
async stream(): Promise<ReadStream> {
const [sql, parameters] = this.getSqlAndParameters();
try {
const stream = await this.queryRunner.stream(sql, parameters);
stream.on("end", () => {
if (this.ownQueryRunner) // means we created our own query runner
return this.queryRunner.release();
return;
});
return stream;
} finally {
if (this.ownQueryRunner) // means we created our own query runner
await this.queryRunner.release();
}
}
/**
* Enables special query builder options.
*

View File

@ -5,6 +5,7 @@ import {ForeignKeySchema} from "../schema-builder/schema/ForeignKeySchema";
import {IndexSchema} from "../schema-builder/schema/IndexSchema";
import {Connection} from "../connection/Connection";
import {EntityManager} from "../entity-manager/EntityManager";
import {ReadStream} from "fs";
/**
* Runs queries on a single database connection.
@ -75,6 +76,11 @@ export interface QueryRunner {
*/
query(query: string, parameters?: any[]): Promise<any>;
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream>; // todo: ReadStream gonna bring problems in websql driver
/**
* Insert a new row with given values into the given table.
* Returns value of the generated column if given and generate column exist in the table.