implemented broadcaster and event sending on load/persist/remove

This commit is contained in:
Umed Khudoiberdiev 2016-03-21 16:14:17 +05:00
parent cd9ff1494a
commit 447c145589
17 changed files with 345 additions and 134 deletions

View File

@ -0,0 +1,69 @@
import {createMysqlConnection} from "../../src/typeorm";
import {Post} from "./entity/Post";
import {PostCategory} from "./entity/PostCategory";
import {PostAuthor} from "./entity/PostAuthor";
// first create a connection
let options = {
host: "192.168.99.100",
port: 3306,
username: "root",
password: "admin",
database: "test",
autoSchemaCreate: true
};
createMysqlConnection(options, [__dirname + "/entity"], [__dirname + "/subscriber"]).then(connection => {
let category1 = new PostCategory();
category1.name = "post category #1";
let category2 = new PostCategory();
category2.name = "post category #2";
let author = new PostAuthor();
author.name = "Umed";
let post = new Post();
post.text = "Hello how are you?";
post.title = "hello";
post.categories.push(category1, category2);
post.author = author;
let postRepository = connection.getRepository(Post);
postRepository
.persist(post)
.then(post => {
console.log("Post has been saved");
return postRepository.findById(post.id);
})
.then(loadedPost => {
console.log("---------------------------");
console.log("post is loaded. Lets now load it with relations.");
return postRepository
.createQueryBuilder("p")
.leftJoinAndSelect("p.author", "author")
.leftJoinAndSelect("p.categories", "categories")
.where("p.id = :id", { id: loadedPost.id })
.getSingleResult();
})
.then(loadedPost => {
console.log("---------------------------");
console.log("load finished. Now lets update entity");
loadedPost.text = "post updated";
loadedPost.author.name = "Bakha";
return postRepository.persist(loadedPost);
})
.then(loadedPost => {
console.log("---------------------------");
console.log("update finished. Now lets remove entity");
return postRepository.remove(loadedPost);
})
.then(loadedPost => {
console.log("---------------------------");
console.log("load removed.");
})
.catch(error => console.log("Cannot save. Error: ", error.stack ? error.stack : error));
}, error => console.log("Cannot connect: ", error.stack ? error.stack : error));

View File

@ -0,0 +1,35 @@
import {PrimaryColumn, Column} from "../../../src/decorator/Columns";
import {Table} from "../../../src/decorator/Tables";
import {ManyToMany} from "../../../src/decorator/Relations";
import {PostCategory} from "./PostCategory";
import {PostAuthor} from "./PostAuthor";
import {OneToMany} from "../../../src/decorator/relations/OneToMany";
import {ManyToOne} from "../../../src/decorator/relations/ManyToOne";
@Table("sample5_post")
export class Post {
@PrimaryColumn("int", { autoIncrement: true })
id: number;
@Column()
title: string;
@Column()
text: string;
@ManyToOne(type => PostAuthor, post => post.posts, {
cascadeInsert: true,
cascadeUpdate: true,
cascadeRemove: true
})
author: PostAuthor;
@ManyToMany(type => PostCategory, category => category.posts, {
cascadeInsert: true,
cascadeUpdate: true,
cascadeRemove: true
})
categories: PostCategory[] = [];
}

View File

@ -0,0 +1,18 @@
import {PrimaryColumn, Column} from "../../../src/decorator/Columns";
import {Table} from "../../../src/decorator/Tables";
import {Post} from "./Post";
import {OneToMany} from "../../../src/decorator/relations/OneToMany";
@Table("sample5_post_author")
export class PostAuthor {
@PrimaryColumn("int", { autoIncrement: true })
id: number;
@Column()
name: string;
@OneToMany(type => Post, post => post.author)
posts: Post[];
}

View File

@ -0,0 +1,22 @@
import {PrimaryColumn, Column} from "../../../src/decorator/Columns";
import {Table} from "../../../src/decorator/Tables";
import {Post} from "./Post";
import {ManyToManyInverse} from "../../../src/decorator/relations/ManyToManyInverse";
@Table("sample5_post_category")
export class PostCategory {
@PrimaryColumn("int", { autoIncrement: true })
id: number;
@Column()
name: string;
@ManyToManyInverse(type => Post, post => post.categories, {
cascadeInsert: true,
cascadeUpdate: true,
cascadeRemove: true
})
posts: Post[] = [];
}

View File

@ -0,0 +1,59 @@
import {OrmEventSubscriber} from "../../../src/decorator/OrmEventSubscriber";
import {OrmSubscriber} from "../../../src/subscriber/OrmSubscriber";
import {InsertEvent} from "../../../src/subscriber/event/InsertEvent";
import {RemoveEvent} from "../../../src/subscriber/event/RemoveEvent";
import {UpdateEvent} from "../../../src/subscriber/event/UpdateEvent";
@OrmEventSubscriber()
export class EverythingSubscriber implements OrmSubscriber<any> {
/**
* Called after entity insertion.
*/
beforeInsert(event: InsertEvent<any>) {
console.log(`BEFORE ENTITY INSERTED: `, event.entity);
}
/**
* Called after entity insertion.
*/
beforeUpdate(event: UpdateEvent<any>) {
console.log(`BEFORE ENTITY UPDATED: `, event.entity);
}
/**
* Called after entity insertion.
*/
beforeRemove(event: RemoveEvent<any>) {
console.log(`BEFORE ENTITY WITH ID ${event.entityId} REMOVED: `, event.entity);
}
/**
* Called after entity insertion.
*/
afterInsert(event: InsertEvent<any>) {
console.log(`AFTER ENTITY INSERTED: `, event.entity);
}
/**
* Called after entity insertion.
*/
afterUpdate(event: UpdateEvent<any>) {
console.log(`AFTER ENTITY UPDATED: `, event.entity);
}
/**
* Called after entity insertion.
*/
afterRemove(event: RemoveEvent<any>) {
console.log(`AFTER ENTITY WITH ID ${event.entityId} REMOVED: `, event.entity);
}
/**
* Called after entity is loaded.
*/
afterLoad(entity: any) {
console.log(`AFTER ENTITY LOADED: `, entity);
}
}

View File

@ -2,7 +2,6 @@ import {Driver} from "../driver/Driver";
import {ConnectionOptions} from "./ConnectionOptions";
import {Repository} from "../repository/Repository";
import {OrmSubscriber} from "../subscriber/OrmSubscriber";
import {OrmBroadcaster} from "../subscriber/OrmBroadcaster";
import {RepositoryNotFoundError} from "./error/RepositoryNotFoundError";
import {BroadcasterNotFoundError} from "./error/BroadcasterNotFoundError";
import {EntityMetadata} from "../metadata-builder/metadata/EntityMetadata";
@ -28,7 +27,6 @@ export class Connection {
private _driver: Driver;
private _metadatas: EntityMetadata[] = [];
private _subscribers: OrmSubscriber<any>[] = [];
private _broadcasters: OrmBroadcaster<any>[] = [];
private repositoryAndMetadatas: RepositoryAndMetadata[] = [];
private _options: ConnectionOptions;
@ -67,13 +65,6 @@ export class Connection {
return this._subscribers;
}
/**
* All broadcasters that are registered for this connection.
*/
get broadcasters(): OrmBroadcaster<any>[] {
return this._broadcasters;
}
/**
* All metadatas that are registered for this connection.
*/
@ -125,7 +116,6 @@ export class Connection {
*/
addMetadatas(metadatas: EntityMetadata[]) {
this._metadatas = this._metadatas.concat(metadatas);
this._broadcasters = this._broadcasters.concat(metadatas.map(metadata => this.createBroadcasterForMetadata(metadata)));
this.repositoryAndMetadatas = this.repositoryAndMetadatas.concat(metadatas.map(metadata => this.createRepoMeta(metadata)));
}
@ -159,29 +149,14 @@ export class Connection {
return metadata;
}
/**
* Gets the broadcaster for the given entity class.
*/
getBroadcaster<Entity>(entityClass: ConstructorFunction<Entity>): OrmBroadcaster<Entity> {
const metadata = this.broadcasters.find(broadcaster => broadcaster.entityClass === entityClass);
if (!metadata)
throw new BroadcasterNotFoundError(entityClass);
return metadata;
}
// -------------------------------------------------------------------------
// Private Methods
// -------------------------------------------------------------------------
private createBroadcasterForMetadata(metadata: EntityMetadata): OrmBroadcaster<any> {
return new OrmBroadcaster<any>(this.subscribers, metadata.target);
}
private createRepoMeta(metadata: EntityMetadata): RepositoryAndMetadata {
return {
metadata: metadata,
repository: new Repository<any>(this, metadata, this.getBroadcaster(<any> metadata.target))
repository: new Repository<any>(this, metadata)
};
}

View File

@ -104,7 +104,7 @@ export class ConnectionManager {
const allSubscriberClasses = importClassesFromDirectories(paths);
const subscribers = defaultMetadataStorage
.findOrmEventSubscribersForClasses(allSubscriberClasses)
.map(metadata => this.createContainerInstance(metadata.constructor));
.map(metadata => this.createContainerInstance(metadata.target));
this.getConnection(connectionName).addSubscribers(subscribers);
}

View File

@ -115,7 +115,6 @@ export class MetadataStorage {
// -------------------------------------------------------------------------
findOrmEventSubscribersForClasses(classes: Function[]): OrmEventSubscriberMetadata[] {
// todo: didn't checked. Check if is working. Maybe dont need to use target and use constructor somehow?
return this.ormEventSubscriberMetadatas.filter(metadata => classes.indexOf(metadata.target) !== -1);
}

View File

@ -142,7 +142,8 @@ export class EntityPersistOperationBuilder {
return operations;
} else if (diff.length) {
operations.push(new UpdateOperation(newEntity, diff));
const entityId = newEntity[metadata.primaryColumn.name];
operations.push(new UpdateOperation(newEntity, entityId, diff));
}
metadata.relations
@ -189,7 +190,7 @@ export class EntityPersistOperationBuilder {
return operations; // looks like object is removed here, but cascades are not allowed - then we should stop iteration
} else if (isObjectRemoved) { // object is remove and cascades are allow here
operations.push(new RemoveOperation(dbEntity, fromMetadata, fromRelation, fromEntityId));
operations.push(new RemoveOperation(dbEntity, relationId, fromMetadata, fromRelation, fromEntityId));
}
metadata.relations

View File

@ -6,6 +6,7 @@ import {JunctionInsertOperation} from "./operation/JunctionInsertOperation";
import {InsertOperation} from "./operation/InsertOperation";
import {JunctionRemoveOperation} from "./operation/JunctionRemoveOperation";
import {UpdateByRelationOperation} from "./operation/UpdateByRelationOperation";
import {OrmBroadcaster} from "../subscriber/OrmBroadcaster";
/**
* Executes PersistOperation in the given connection.
@ -27,7 +28,10 @@ export class PersistOperationExecutor {
* Executes given persist operation.
*/
executePersistOperation(persistOperation: PersistOperation) {
const broadcaster = new OrmBroadcaster(this.connection);
return Promise.resolve()
.then(() => this.broadcastBeforeEvents(broadcaster, persistOperation))
.then(() => this.connection.driver.beginTransaction())
.then(() => this.executeInsertOperations(persistOperation))
.then(() => this.executeInsertJunctionsOperations(persistOperation))
@ -38,13 +42,60 @@ export class PersistOperationExecutor {
.then(() => this.executeRemoveOperations(persistOperation))
.then(() => this.connection.driver.endTransaction())
.then(() => this.updateIdsOfInsertedEntities(persistOperation))
.then(() => this.updateIdsOfRemovedEntities(persistOperation));
.then(() => this.updateIdsOfRemovedEntities(persistOperation))
.then(() => this.broadcastAfterEvents(broadcaster, persistOperation));
}
// -------------------------------------------------------------------------
// Private Methods
// -------------------------------------------------------------------------
/**
* Broadcast all before persistment events - beforeInsert, beforeUpdate and beforeRemove events.
*/
private broadcastBeforeEvents(broadcaster: OrmBroadcaster, persistOperation: PersistOperation) {
const insertEvents = persistOperation.inserts.map(insertOperation => {
const persistedEntity = persistOperation.allPersistedEntities.find(e => e.id === insertOperation.entityId);
return broadcaster.broadcastBeforeInsertEvent(persistedEntity);
});
const updateEvents = persistOperation.updates.map(updateOperation => {
const persistedEntity = persistOperation.allPersistedEntities.find(e => e.id === updateOperation.entityId);
return broadcaster.broadcastBeforeUpdateEvent(persistedEntity, updateOperation.columns);
});
const removeEvents = persistOperation.removes.map(removeOperation => {
const persistedEntity = persistOperation.allPersistedEntities.find(e => e.id === removeOperation.entityId);
return broadcaster.broadcastBeforeRemoveEvent(persistedEntity, removeOperation.entityId);
});
return Promise.all(insertEvents)
.then(() => Promise.all(updateEvents))
.then(() => Promise.all(removeEvents)); // todo: do we really should send it in order?
}
/**
* Broadcast all after persistment events - afterInsert, afterUpdate and afterRemove events.
*/
private broadcastAfterEvents(broadcaster: OrmBroadcaster, persistOperation: PersistOperation) {
const insertEvents = persistOperation.inserts.map(insertOperation => {
const persistedEntity = persistOperation.allPersistedEntities.find(e => e.id === insertOperation.entityId);
return broadcaster.broadcastAfterInsertEvent(persistedEntity);
});
const updateEvents = persistOperation.updates.map(updateOperation => {
const persistedEntity = persistOperation.allPersistedEntities.find(e => e.id === updateOperation.entityId);
return broadcaster.broadcastAfterUpdateEvent(persistedEntity, updateOperation.columns);
});
const removeEvents = persistOperation.removes.map(removeOperation => {
const persistedEntity = persistOperation.allPersistedEntities.find(e => e.id === removeOperation.entityId);
return broadcaster.broadcastAfterRemoveEvent(persistedEntity, removeOperation.entityId);
});
return Promise.all(insertEvents)
.then(() => Promise.all(updateEvents))
.then(() => Promise.all(removeEvents)); // todo: do we really should send it in order?
}
/**
* Executes insert operations.
*/
@ -127,8 +178,6 @@ export class PersistOperationExecutor {
* Removes all ids of the removed entities.
*/
private updateIdsOfRemovedEntities(persistOperation: PersistOperation) {
// console.log("OPERATION REMOVES: ", persistOperation.removes);
// console.log("ALL NEW ENTITIES: ", persistOperation.allNewEntities);
persistOperation.removes.forEach(removeOperation => {
const metadata = this.connection.getMetadata(removeOperation.entity.constructor);
const removedEntity = persistOperation.allPersistedEntities.find(allNewEntity => {

View File

@ -3,6 +3,7 @@ import {EntityMetadata} from "../../metadata-builder/metadata/EntityMetadata";
export class RemoveOperation {
constructor(public entity: any,
public entityId: any,
public fromMetadata: EntityMetadata, //todo: use relation.metadata instead?
public relation: RelationMetadata,
public fromEntityId: any) {

View File

@ -2,6 +2,7 @@ import {ColumnMetadata} from "../../metadata-builder/metadata/ColumnMetadata";
export class UpdateOperation {
constructor(public entity: any,
public entityId: any,
public columns: ColumnMetadata[]) {
}
}

View File

@ -2,6 +2,7 @@ import {Alias} from "./alias/Alias";
import {AliasMap} from "./alias/AliasMap";
import {Connection} from "../connection/Connection";
import {RawSqlResultsToEntityTransformer} from "./transformer/RawSqlResultsToEntityTransformer";
import {OrmBroadcaster} from "../subscriber/OrmBroadcaster";
export interface Join {
alias: Alias;
@ -282,9 +283,14 @@ export class QueryBuilder<Entity> {
}
getResults(): Promise<Entity[]> {
const broadcaster = new OrmBroadcaster(this.connection);
return this.connection.driver
.query<any[]>(this.getSql())
.then(results => this.rawResultsToEntities(results));
.then(results => this.rawResultsToEntities(results))
.then(results => {
broadcaster.broadcastLoadEventsForAll(results);
return results;
});
}
getSingleResult(): Promise<Entity> {

View File

@ -1,6 +1,5 @@
import {Connection} from "../connection/Connection";
import {EntityMetadata} from "../metadata-builder/metadata/EntityMetadata";
import {OrmBroadcaster} from "../subscriber/OrmBroadcaster";
import {QueryBuilder} from "../query-builder/QueryBuilder";
import {PlainObjectToNewEntityTransformer} from "../query-builder/transformer/PlainObjectToNewEntityTransformer";
import {PlainObjectToDatabaseEntityTransformer} from "../query-builder/transformer/PlainObjectToDatabaseEntityTransformer";
@ -14,35 +13,11 @@ import {PersistOperationExecutor} from "../persistment/PersistOperationExecutor"
*/
export class Repository<Entity> {
// -------------------------------------------------------------------------
// Properties
// -------------------------------------------------------------------------
/**
* Connection used by this repository.
*/
private connection: Connection;
/**
* Entity metadata of the table with which this repository is working.
*/
private metadata: EntityMetadata;
/**
* Broadcaster used to broadcast this repository events.
*/
private broadcaster: OrmBroadcaster<Entity>;
// -------------------------------------------------------------------------
// Constructor
// -------------------------------------------------------------------------
constructor(connection: Connection,
metadata: EntityMetadata,
broadcaster: OrmBroadcaster<Entity>) {
this.connection = connection;
this.metadata = metadata;
this.broadcaster = broadcaster;
constructor(private connection: Connection, private metadata: EntityMetadata) {
}
// -------------------------------------------------------------------------

View File

@ -1,99 +1,87 @@
import {UpdateEvent} from "./event/UpdateEvent";
import {RemoveEvent} from "./event/RemoveEvent";
import {InsertEvent} from "./event/InsertEvent";
import {OrmSubscriber} from "./OrmSubscriber";
import {Connection} from "../connection/Connection";
import {ColumnMetadata} from "../metadata-builder/metadata/ColumnMetadata";
/**
* Broadcaster provides a helper methods to broadcast events to the subscribers.
*/
export class OrmBroadcaster<Entity> {
// -------------------------------------------------------------------------
// Properties
// -------------------------------------------------------------------------
private subscribers: OrmSubscriber<Entity|any>[];
private _entityClass: Function;
export class OrmBroadcaster {
// -------------------------------------------------------------------------
// Constructor
// -------------------------------------------------------------------------
constructor(subscribers: OrmSubscriber<Entity|any>[], entityClass: Function) {
this.subscribers = subscribers;
this._entityClass = entityClass;
constructor(private connection: Connection) {
}
// -------------------------------------------------------------------------
// Accessors
// -------------------------------------------------------------------------
get entityClass(): Function {
return this._entityClass;
}
// -------------------------------------------------------------------------
// Public Methods
// -------------------------------------------------------------------------
broadcastBeforeInsert(event: InsertEvent<Entity>) {
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
broadcastBeforeInsertEvent(entity: any) {
this.connection
.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber, entity))
.filter(subscriber => !!subscriber.beforeInsert)
.forEach(subscriber => subscriber.beforeInsert(event));
.forEach(subscriber => subscriber.beforeInsert({ entity: entity }));
}
broadcastAfterInsert(event: InsertEvent<Entity>) {
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
.filter(subscriber => !!subscriber.afterInsert)
.forEach(subscriber => subscriber.afterInsert(event));
}
broadcastBeforeUpdate(event: UpdateEvent<Entity>) {
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
broadcastBeforeUpdateEvent(entity: any, updatedColumns: ColumnMetadata[]) {
this.connection
.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber, entity))
.filter(subscriber => !!subscriber.beforeUpdate)
.forEach(subscriber => subscriber.beforeUpdate(event));
.forEach(subscriber => subscriber.beforeUpdate({ entity: entity, updatedColumns: updatedColumns }));
}
broadcastAfterUpdate(event: UpdateEvent<Entity>) {
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
.filter(subscriber => !!subscriber.afterUpdate)
.forEach(subscriber => subscriber.afterUpdate(event));
}
broadcastAfterRemove(event: RemoveEvent<Entity>) {
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
.filter(subscriber => !!subscriber.afterRemove)
.forEach(subscriber => subscriber.afterRemove(event));
}
broadcastBeforeRemove(event: RemoveEvent<Entity>) {
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
broadcastBeforeRemoveEvent(entity: any, entityId: any) {
this.connection
.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber, entity))
.filter(subscriber => !!subscriber.beforeRemove)
.forEach(subscriber => subscriber.beforeRemove(event));
.forEach(subscriber => subscriber.beforeRemove({ entity: entity, entityId: entityId }));
}
broadcastAfterLoadedAll(entities: Entity[]) {
if (!entities || entities.length) return;
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
.filter(subscriber => !!subscriber.afterLoad)
.forEach(subscriber => {
entities.forEach(entity => subscriber.afterLoad(entity));
});
broadcastAfterInsertEvent(entity: any) {
this.connection
.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber, entity))
.filter(subscriber => !!subscriber.afterInsert)
.forEach(subscriber => subscriber.afterInsert({ entity: entity }));
}
broadcastAfterLoaded(entity: Entity) {
if (!entity) return;
broadcastAfterUpdateEvent(entity: any, updatedColumns: ColumnMetadata[]) {
this.connection
.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber, entity))
.filter(subscriber => !!subscriber.afterUpdate)
.forEach(subscriber => subscriber.afterUpdate({ entity: entity, updatedColumns: updatedColumns }));
}
this.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber))
broadcastAfterRemoveEvent(entity: any, entityId: any) {
this.connection
.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber, entity))
.filter(subscriber => !!subscriber.afterRemove)
.forEach(subscriber => subscriber.afterRemove({ entity: entity, entityId: entityId }));
}
broadcastLoadEventsForAll(entities: any[]) {
entities.forEach(entity => this.broadcastLoadEvents(entity));
}
broadcastLoadEvents(entity: any) {
const metadata = this.connection.getMetadata(entity.constructor);
metadata
.relations
.filter(relation => entity.hasOwnProperty(relation.propertyName))
.map(relation => entity[relation.propertyName])
.forEach(value => value instanceof Array ? this.broadcastLoadEventsForAll(value) : this.broadcastLoadEvents(value));
this.connection
.subscribers
.filter(subscriber => this.isAllowedSubscribers(subscriber, entity))
.filter(subscriber => !!subscriber.afterLoad)
.forEach(subscriber => subscriber.afterLoad(entity));
}
@ -102,8 +90,11 @@ export class OrmBroadcaster<Entity> {
// Private Methods
// -------------------------------------------------------------------------
private isAllowedSubscribers(subscriber: OrmSubscriber<Entity|any>) {
return !subscriber.listenTo() || subscriber.listenTo() === Object || subscriber.listenTo() === this._entityClass;
private isAllowedSubscribers(subscriber: OrmSubscriber<any>, cls: Function) {
return !subscriber.listenTo ||
!subscriber.listenTo() ||
subscriber.listenTo() === Object ||
subscriber.listenTo() === cls;
}
}

View File

@ -10,7 +10,7 @@ export interface OrmSubscriber<Entity> {
/**
* Returns the class of the entity to which events will listen.
*/
listenTo(): Function;
listenTo?(): Function;
/**
* Called after entity is loaded.

View File

@ -1,10 +1,20 @@
import {ColumnMetadata} from "../../metadata-builder/metadata/ColumnMetadata";
/**
* This event is used on update events.
*/
export interface UpdateEvent<Entity> {
// todo: will we send an entity changeset ?
entity?: Entity;
// todo: send old and new update values
/**
* Updated entity.
*/
entity: Entity;
/**
* List of columns that were updated.
*/
updatedColumns: ColumnMetadata[];
}