Commit 9180ecd1 authored by Christopher Mark Fullarton's avatar Christopher Mark Fullarton
Browse files

Adds rabbitmq as message broker for frontend and backend

parent 828cb5b0
...@@ -16,6 +16,15 @@ To send E-Mails it is required to provide additional variables: ...@@ -16,6 +16,15 @@ To send E-Mails it is required to provide additional variables:
- `ARSNOVA_CLICK_BACKEND_MAIL_FROM [string]`: The `from` header of the E-Mails - `ARSNOVA_CLICK_BACKEND_MAIL_FROM [string]`: The `from` header of the E-Mails
- `ARSNOVA_CLICK_BACKEND_MAIL_TO [string]`: The `to` header of the E-Mails - `ARSNOVA_CLICK_BACKEND_MAIL_TO [string]`: The `to` header of the E-Mails
###### RabbitMQ
The server uses RabbitMQ to send messages to the frontend. These variables can be adjusted to connect to the RabbitMQ server:
- `AMQP_PROTOCOL [string]`: Protocol for the connection (defaults to amqp)
- `AMQP_HOSTNAME [string]`: Hostname of the RabbitMQ Server (defaults to localhost)
- `AMQP_USER [string]`: The username to use for the connection (defaults to guest)
- `AMQP_PASSWORD [string]`: The username to use for the connection (defaults to guest)
As mentioned in the RabbitMQ installation guideline, the user should not be an management user!
###### Dumps ###### Dumps
The server will generate dumps if an Error is thrown. The server will generate dumps if an Error is thrown.
The dump will contain the serialized error and the state of the DAOs. The dump will contain the serialized error and the state of the DAOs.
......
import { Channel, connect, Connection } from 'amqplib';
import { settings } from '../statistics';
class AMQPConnector {
private static _instance: AMQPConnector;
private _channel: Channel;
get channel(): Channel {
return this._channel;
}
private _connection: Connection;
constructor() {
}
public static getInstance(): AMQPConnector {
if (!this._instance) {
this._instance = new AMQPConnector();
}
return this._instance;
}
public async initConnection(): Promise<void> {
this._connection = await connect({
protocol: settings.amqp.protocol,
hostname: settings.amqp.hostname,
username: settings.amqp.user,
password: settings.amqp.password,
});
this._channel = await this._connection.createChannel();
}
}
export default AMQPConnector.getInstance();
...@@ -2,7 +2,6 @@ import { ObjectId } from 'bson'; ...@@ -2,7 +2,6 @@ import { ObjectId } from 'bson';
import { MemberEntity } from '../entities/member/MemberEntity'; import { MemberEntity } from '../entities/member/MemberEntity';
import { QuizEntity } from '../entities/quiz/QuizEntity'; import { QuizEntity } from '../entities/quiz/QuizEntity';
import { DbCollection, DbEvent } from '../enums/DbOperation'; import { DbCollection, DbEvent } from '../enums/DbOperation';
import { IMemberEntity } from '../interfaces/entities/Member/IMemberEntity';
import { IMemberSerialized } from '../interfaces/entities/Member/IMemberSerialized'; import { IMemberSerialized } from '../interfaces/entities/Member/IMemberSerialized';
import { IQuizEntity } from '../interfaces/quizzes/IQuizEntity'; import { IQuizEntity } from '../interfaces/quizzes/IQuizEntity';
import { AbstractDAO } from './AbstractDAO'; import { AbstractDAO } from './AbstractDAO';
...@@ -83,7 +82,7 @@ class MemberDAO extends AbstractDAO<Array<MemberEntity>> { ...@@ -83,7 +82,7 @@ class MemberDAO extends AbstractDAO<Array<MemberEntity>> {
} }
} }
public getMembersOfQuiz(quizName: string): Array<IMemberEntity> { public getMembersOfQuiz(quizName: string): Array<MemberEntity> {
return this.storage.filter(val => !!val.currentQuizName.match(new RegExp(`^${RegExp.escape(quizName)}$`, 'i'))); return this.storage.filter(val => !!val.currentQuizName.match(new RegExp(`^${RegExp.escape(quizName)}$`, 'i')));
} }
......
...@@ -2,6 +2,7 @@ import * as mongoose from 'mongoose'; ...@@ -2,6 +2,7 @@ import * as mongoose from 'mongoose';
import { Connection } from 'mongoose'; import { Connection } from 'mongoose';
import { Database } from '../enums/DbOperation'; import { Database } from '../enums/DbOperation';
import LoggerService from '../services/LoggerService'; import LoggerService from '../services/LoggerService';
import AMQPConnector from './AMQPConnector';
class MongoDbConnector { class MongoDbConnector {
get dbName(): string { get dbName(): string {
...@@ -31,6 +32,10 @@ class MongoDbConnector { ...@@ -31,6 +32,10 @@ class MongoDbConnector {
resolve(db); resolve(db);
}); });
AMQPConnector.initConnection().then(() => {
AMQPConnector.channel.assertExchange('global', 'fanout');
});
await mongoose.connect(this._mongoURL, { await mongoose.connect(this._mongoURL, {
useCreateIndex: true, useCreateIndex: true,
autoIndex: true, autoIndex: true,
......
import { ObjectId } from 'bson'; import { ObjectId } from 'bson';
import WebSocket from 'ws';
import { MemberGroupEntity } from '../../entities/member/MemberGroupEntity'; import { MemberGroupEntity } from '../../entities/member/MemberGroupEntity';
import { getQuestionForType } from '../../entities/question/QuizValidator'; import { getQuestionForType } from '../../entities/question/QuizValidator';
import { QuizEntity } from '../../entities/quiz/QuizEntity'; import { QuizEntity } from '../../entities/quiz/QuizEntity';
...@@ -11,6 +10,7 @@ import { IQuizEntity, IQuizSerialized } from '../../interfaces/quizzes/IQuizEnti ...@@ -11,6 +10,7 @@ import { IQuizEntity, IQuizSerialized } from '../../interfaces/quizzes/IQuizEnti
import { generateToken } from '../../lib/generateToken'; import { generateToken } from '../../lib/generateToken';
import { setPath } from '../../lib/resolveNestedObjectProperty'; import { setPath } from '../../lib/resolveNestedObjectProperty';
import { AbstractDAO } from '../AbstractDAO'; import { AbstractDAO } from '../AbstractDAO';
import AMQPConnector from '../AMQPConnector';
import DbDAO from '../DbDAO'; import DbDAO from '../DbDAO';
import MemberDAO from '../MemberDAO'; import MemberDAO from '../MemberDAO';
...@@ -55,7 +55,7 @@ class QuizDAO extends AbstractDAO<Array<IQuizEntity>> { ...@@ -55,7 +55,7 @@ class QuizDAO extends AbstractDAO<Array<IQuizEntity>> {
public removeQuiz(id: ObjectId): void { public removeQuiz(id: ObjectId): void {
const removedQuiz = this.storage.splice(this.storage.findIndex(val => val.id.equals(id)), 1); const removedQuiz = this.storage.splice(this.storage.findIndex(val => val.id.equals(id)), 1);
removedQuiz[0].onRemove(); removedQuiz[0].onRemove();
removedQuiz[0].state = 0; removedQuiz[0].state = QuizState.Inactive;
MemberDAO.removeMembersOfQuiz(removedQuiz[0]); MemberDAO.removeMembersOfQuiz(removedQuiz[0]);
} }
...@@ -151,16 +151,13 @@ class QuizDAO extends AbstractDAO<Array<IQuizEntity>> { ...@@ -151,16 +151,13 @@ class QuizDAO extends AbstractDAO<Array<IQuizEntity>> {
} }
} }
public joinableQuizzesUpdated(): void {
this.updateEmitter.emit(DbEvent.Change, this.getJoinableQuizzes());
}
public async addQuiz(quizDoc: IQuizSerialized): Promise<IQuizEntity> { public async addQuiz(quizDoc: IQuizSerialized): Promise<IQuizEntity> {
if (this.getQuizByName(quizDoc.name)) { if (this.getQuizByName(quizDoc.name)) {
throw new Error(`Duplicate quiz insertion: ${quizDoc.name}`); throw new Error(`Duplicate quiz insertion: ${quizDoc.name}`);
} }
const entity = new QuizEntity(quizDoc); const entity = new QuizEntity(quizDoc);
await AMQPConnector.channel.assertExchange(`quiz_${encodeURI(entity.name)}`, 'fanout');
this.storage.push(entity); this.storage.push(entity);
return entity; return entity;
} }
...@@ -233,10 +230,6 @@ class QuizDAO extends AbstractDAO<Array<IQuizEntity>> { ...@@ -233,10 +230,6 @@ class QuizDAO extends AbstractDAO<Array<IQuizEntity>> {
return this.getActiveQuizzes().find(val => !!val.name.match(new RegExp(`^${RegExp.escape(quizName)}$`, 'i'))); return this.getActiveQuizzes().find(val => !!val.name.match(new RegExp(`^${RegExp.escape(quizName)}$`, 'i')));
} }
public getQuizBySocket(ws: WebSocket): IQuizEntity {
return this.storage.find(quiz => quiz.containsSocket(ws));
}
public getQuizByToken(token: string): IQuizEntity { public getQuizByToken(token: string): IQuizEntity {
return this.storage.find(quiz => quiz.privateKey === token); return this.storage.find(quiz => quiz.privateKey === token);
} }
......
import { ObjectId } from 'bson'; import { ObjectId } from 'bson';
import { DeleteWriteOpResultObject } from 'mongodb'; import { DeleteWriteOpResultObject } from 'mongodb';
import * as WebSocket from 'ws'; import AMQPConnector from '../../db/AMQPConnector';
import DbDAO from '../../db/DbDAO'; import DbDAO from '../../db/DbDAO';
import MemberDAO from '../../db/MemberDAO'; import MemberDAO from '../../db/MemberDAO';
import { DbCollection } from '../../enums/DbOperation'; import { DbCollection } from '../../enums/DbOperation';
...@@ -10,7 +10,6 @@ import { QuizState } from '../../enums/QuizState'; ...@@ -10,7 +10,6 @@ import { QuizState } from '../../enums/QuizState';
import { QuizVisibility } from '../../enums/QuizVisibility'; import { QuizVisibility } from '../../enums/QuizVisibility';
import { IQuizEntity, IQuizSerialized } from '../../interfaces/quizzes/IQuizEntity'; import { IQuizEntity, IQuizSerialized } from '../../interfaces/quizzes/IQuizEntity';
import { ISessionConfigurationEntity } from '../../interfaces/session_configuration/ISessionConfigurationEntity'; import { ISessionConfigurationEntity } from '../../interfaces/session_configuration/ISessionConfigurationEntity';
import { SendSocketMessageService } from '../../services/SendSocketMessageService';
import { AbstractEntity } from '../AbstractEntity'; import { AbstractEntity } from '../AbstractEntity';
import { MemberEntity } from '../member/MemberEntity'; import { MemberEntity } from '../member/MemberEntity';
import { MemberGroupEntity } from '../member/MemberGroupEntity'; import { MemberGroupEntity } from '../member/MemberGroupEntity';
...@@ -142,7 +141,8 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity { ...@@ -142,7 +141,8 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity {
private _dropEmptyQuizTimeout: any; private _dropEmptyQuizTimeout: any;
private _quizTimerInterval: any; private _quizTimerInterval: any;
private _quizTimer: number; private _quizTimer: number;
private _socketChannel: Array<WebSocket> = [];
private readonly _exchangeName: string;
constructor(quiz: IQuizSerialized) { constructor(quiz: IQuizSerialized) {
super(); super();
...@@ -159,87 +159,52 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity { ...@@ -159,87 +159,52 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity {
this._readingConfirmationRequested = !!quiz.readingConfirmationRequested; this._readingConfirmationRequested = !!quiz.readingConfirmationRequested;
this._visibility = quiz.visibility; this._visibility = quiz.visibility;
this._description = quiz.description; this._description = quiz.description;
this._exchangeName = encodeURI(`quiz_${quiz.name}`);
} }
public onMemberAdded(member: MemberEntity): void { public async onMemberAdded(member: MemberEntity): Promise<void> {
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.Added, step: MessageProtocol.Added,
payload: { member: member.serialize() }, payload: { member: member.serialize() },
})); })));
} }
public onMemberRemoved(member: MemberEntity): void { public async onMemberRemoved(member: MemberEntity): Promise<void> {
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.Removed, step: MessageProtocol.Removed,
payload: { name: member.name }, payload: { name: member.name },
})); })));
} }
public onRemove(): void { public onRemove(): void {
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { MemberDAO.getMembersOfQuiz(this.name).forEach(member => {
AMQPConnector.channel.deleteQueue(encodeURI(`${member.currentQuizName}_${member.name}`));
});
AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.Closed, step: MessageProtocol.Closed,
})); })));
} }
public reset(): void { public reset(): void {
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.Reset, step: MessageProtocol.Reset,
})); })));
clearTimeout(this._quizTimerInterval); clearTimeout(this._quizTimerInterval);
} }
public stop(): void { public stop(): void {
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.Stop, step: MessageProtocol.Stop,
})); })));
this.currentStartTimestamp = -1; this.currentStartTimestamp = -1;
clearTimeout(this._quizTimerInterval); clearTimeout(this._quizTimerInterval);
} }
public addSocketToChannel(socket: WebSocket): void {
if (this._socketChannel.find(value => value === socket)) {
console.error(`Cannot add socket to quiz channel ${this.name} since it is already added`);
return;
}
console.log(`Adding socket to quiz channel ${this.name}`);
this._socketChannel.push(socket);
clearTimeout(this._dropEmptyQuizTimeout);
this._dropEmptyQuizTimeout = null;
}
public removeSocketFromChannel(socket: WebSocket): void {
const index = this._socketChannel.findIndex(value => value === socket);
if (index === -1) {
console.log(`Cannot remove socket from quiz channel ${this.name} since it is not found`);
return;
}
console.log(`Removing socket from quiz channel ${this.name}`);
this._socketChannel.splice(index, 1);
if (!this._socketChannel.length) {
if (this._dropEmptyQuizTimeout !== null) {
clearTimeout(this._dropEmptyQuizTimeout);
}
this._dropEmptyQuizTimeout = setTimeout(() => {
if (!this._socketChannel.length) {
DbDAO.updateOne(DbCollection.Quizzes, { _id: this.id }, { state: QuizState.Inactive });
DbDAO.deleteMany(DbCollection.Members, { currentQuizName: this.name });
}
}, 300000); // 5 minutes
}
}
public containsSocket(socket: WebSocket): boolean {
return !!this._socketChannel.find(value => value === socket);
}
public addQuestion(question: AbstractQuestionEntity, index: number = -1): void { public addQuestion(question: AbstractQuestionEntity, index: number = -1): void {
if (index === -1 || index >= this.questionList.length) { if (index === -1 || index >= this.questionList.length) {
this.questionList.push(question); this.questionList.push(question);
...@@ -306,13 +271,13 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity { ...@@ -306,13 +271,13 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity {
DbDAO.updateOne(DbCollection.Quizzes, { _id: this.id }, { currentQuestionIndex: nextIndex }); DbDAO.updateOne(DbCollection.Quizzes, { _id: this.id }, { currentQuestionIndex: nextIndex });
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.NextQuestion, step: MessageProtocol.NextQuestion,
payload: { payload: {
nextQuestionIndex: nextIndex, nextQuestionIndex: nextIndex,
}, },
})); })));
return nextIndex; return nextIndex;
} }
...@@ -322,11 +287,11 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity { ...@@ -322,11 +287,11 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity {
} }
public startNextQuestion(): void { public startNextQuestion(): void {
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.Start, step: MessageProtocol.Start,
payload: {}, payload: {},
})); })));
this._quizTimer = this._questionList[this._currentQuestionIndex].timer; this._quizTimer = this._questionList[this._currentQuestionIndex].timer;
if (this._quizTimer <= 0) { if (this._quizTimer <= 0) {
...@@ -338,13 +303,13 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity { ...@@ -338,13 +303,13 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity {
} }
this._quizTimerInterval = setInterval(() => { this._quizTimerInterval = setInterval(() => {
this._quizTimer--; this._quizTimer--;
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.Countdown, step: MessageProtocol.Countdown,
payload: { payload: {
value: this._quizTimer, value: this._quizTimer,
}, },
})); })));
if (this._quizTimer <= 0) { if (this._quizTimer <= 0) {
clearInterval(this._quizTimerInterval); clearInterval(this._quizTimerInterval);
...@@ -356,19 +321,20 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity { ...@@ -356,19 +321,20 @@ export class QuizEntity extends AbstractEntity implements IQuizEntity {
public requestReadingConfirmation(): void { public requestReadingConfirmation(): void {
this._readingConfirmationRequested = true; this._readingConfirmationRequested = true;
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.ReadingConfirmationRequested, step: MessageProtocol.ReadingConfirmationRequested,
payload: {}, payload: {},
})); })));
} }
public updatedMemberResponse(payload: object): void { public updatedMemberResponse(payload: object): void {
this._socketChannel.forEach(socket => SendSocketMessageService.sendMessage(socket, { AMQPConnector.channel.publish(this._exchangeName, '.*', Buffer.from(JSON.stringify({
status: StatusProtocol.Success, status: StatusProtocol.Success,
step: MessageProtocol.UpdatedResponse, step: MessageProtocol.UpdatedResponse,
payload, payload,
})); })));
if (MemberDAO.getMembersOfQuiz(this.name).every(nick => { if (MemberDAO.getMembersOfQuiz(this.name).every(nick => {
const val = nick.responses[this.currentQuestionIndex].value; const val = nick.responses[this.currentQuestionIndex].value;
return typeof val === 'number' ? val > -1 : val.length > 0; return typeof val === 'number' ? val > -1 : val.length > 0;
......
export enum QuizState { export enum QuizState {
Inactive, // Inactive = 'Inactive', //
Active, // Active = 'Active', //
Running, // Running = 'Running', //
Finished, // Finished = 'Finished', //
} }
import { ObjectId } from 'bson'; import { ObjectId } from 'bson';
import { DeleteWriteOpResultObject } from 'mongodb'; import { DeleteWriteOpResultObject } from 'mongodb';
import WebSocket from 'ws';
import { MemberEntity } from '../../entities/member/MemberEntity'; import { MemberEntity } from '../../entities/member/MemberEntity';
import { AbstractQuestionEntity } from '../../entities/question/AbstractQuestionEntity'; import { AbstractQuestionEntity } from '../../entities/question/AbstractQuestionEntity';
import { QuizState } from '../../enums/QuizState'; import { QuizState } from '../../enums/QuizState';
...@@ -30,12 +29,6 @@ export interface IQuizEntity extends IQuizBase { ...@@ -30,12 +29,6 @@ export interface IQuizEntity extends IQuizBase {
addQuestion(question: AbstractQuestionEntity, index: number): void; addQuestion(question: AbstractQuestionEntity, index: number): void;
addSocketToChannel(socket: WebSocket): void;
removeSocketFromChannel(socket: WebSocket): void;
containsSocket(socket: WebSocket): boolean;
updatedMemberResponse(payload: object): void; updatedMemberResponse(payload: object): void;
startNextQuestion(): void; startNextQuestion(): void;
...@@ -46,9 +39,9 @@ export interface IQuizEntity extends IQuizBase { ...@@ -46,9 +39,9 @@ export interface IQuizEntity extends IQuizBase {
onRemove(): void; onRemove(): void;
onMemberAdded(member: MemberEntity): void; onMemberAdded(member: MemberEntity): Promise<void>;
onMemberRemoved(memberEntity: MemberEntity): void; onMemberRemoved(memberEntity: MemberEntity): Promise<void>;
} }
export interface IQuizSerialized extends IQuizBase { export interface IQuizSerialized extends IQuizBase {
......
...@@ -7,7 +7,6 @@ import * as Minimist from 'minimist'; ...@@ -7,7 +7,6 @@ import * as Minimist from 'minimist';
import * as path from 'path'; import * as path from 'path';
import * as process from 'process'; import * as process from 'process';
import 'reflect-metadata'; import 'reflect-metadata';