From ddf03f7209454ef53bd93548911edc98236d0f13 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Tom=20K=C3=A4sler?= <tom.kaesler@mni.thm.de>
Date: Wed, 17 Apr 2019 15:48:44 +0200
Subject: [PATCH] WebSocket Connector

---
 src/app/app.module.ts                         | 14 +----
 .../comment-list/comment-list.component.ts    |  4 +-
 src/app/rx-stomp.config.ts                    |  9 ++-
 .../services/http/authentication.service.ts   |  4 ++
 .../websockets/ws-comment-service.service.ts  | 42 ++++----------
 .../websockets/ws-connector.service.ts        | 58 +++++++++++++++++++
 .../websockets/ws-feedback.service.ts         | 14 ++---
 7 files changed, 89 insertions(+), 56 deletions(-)
 create mode 100644 src/app/services/websockets/ws-connector.service.ts

diff --git a/src/app/app.module.ts b/src/app/app.module.ts
index 83012f4e1..83b65385d 100644
--- a/src/app/app.module.ts
+++ b/src/app/app.module.ts
@@ -14,6 +14,7 @@ import { CommentService } from './services/http/comment.service';
 import { DataStoreService } from './services/util/data-store.service';
 import { ContentService } from './services/http/content.service';
 import { ContentAnswerService } from './services/http/content-answer.service';
+import { WsConnectorService } from './services/websockets/ws-connector.service';
 import { UserActivationComponent } from './components/home/_dialogs/user-activation/user-activation.component';
 import { AuthenticationInterceptor } from './interceptors/authentication.interceptor';
 import { EssentialsModule } from './components/essentials/essentials.module';
@@ -24,8 +25,6 @@ import { LanguageService } from './services/util/language.service';
 import { MarkdownService, MarkedOptions } from 'ngx-markdown';
 import { NewLandingComponent } from './components/home/new-landing/new-landing.component';
 import { HomePageComponent } from './components/home/home-page/home-page.component';
-import { InjectableRxStompConfig, RxStompService, rxStompServiceFactory } from '@stomp/ng2-stompjs';
-import { myRxStompConfig } from './rx-stomp.config';
 import { AppConfig } from './app.config';
 
 export function dialogClose(dialogResult: any) {
@@ -67,15 +66,7 @@ export function initializeApp(appConfig: AppConfig) {
       useClass: AuthenticationInterceptor,
       multi: true
     },
-    {
-      provide: InjectableRxStompConfig,
-      useValue: myRxStompConfig
-    },
-    {
-      provide: RxStompService,
-      useFactory: rxStompServiceFactory,
-      deps: [InjectableRxStompConfig]
-    },
+    WsConnectorService,
     NotificationService,
     AuthenticationService,
     AuthenticationGuard,
@@ -88,6 +79,7 @@ export function initializeApp(appConfig: AppConfig) {
     MarkdownService,
     MarkedOptions,
     UserService,
+    WsConnectorService,
     {
       provide: MatDialogRef,
       useValue: {
diff --git a/src/app/components/shared/comment-list/comment-list.component.ts b/src/app/components/shared/comment-list/comment-list.component.ts
index 506856101..1f3c0a293 100644
--- a/src/app/components/shared/comment-list/comment-list.component.ts
+++ b/src/app/components/shared/comment-list/comment-list.component.ts
@@ -3,7 +3,6 @@ import { Comment } from '../../../models/comment';
 import { CommentService } from '../../../services/http/comment.service';
 import { TranslateService } from '@ngx-translate/core';
 import { LanguageService } from '../../../services/util/language.service';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { Message } from '@stomp/stompjs';
 import { SubmitCommentComponent } from '../_dialogs/submit-comment/submit-comment.component';
 import { MatDialog } from '@angular/material';
@@ -30,7 +29,6 @@ export class CommentListComponent implements OnInit {
     private translateService: TranslateService,
     public dialog: MatDialog,
     protected langService: LanguageService,
-    private rxStompService: RxStompService,
     private wsCommentService: WsCommentServiceService,
     private authenticationService: AuthenticationService) {
     langService.langEmitter.subscribe(lang => translateService.use(lang));
@@ -40,7 +38,7 @@ export class CommentListComponent implements OnInit {
     this.roomId = localStorage.getItem(`roomId`);
     this.comments = [];
     this.hideCommentsList = false;
-    this.rxStompService.watch(`/topic/${this.roomId}.comment.stream`).subscribe((message: Message) => {
+    this.wsCommentService.getCommentStream(this.roomId).subscribe((message: Message) => {
       this.parseIncomingMessage(message);
     });
     this.getComments();
diff --git a/src/app/rx-stomp.config.ts b/src/app/rx-stomp.config.ts
index aaded217c..139561d1f 100644
--- a/src/app/rx-stomp.config.ts
+++ b/src/app/rx-stomp.config.ts
@@ -1,9 +1,14 @@
-import { InjectableRxStompConfig } from '@stomp/ng2-stompjs';
+import { RxStompConfig } from '@stomp/rx-stomp';
 
-export const myRxStompConfig: InjectableRxStompConfig = {
+export const ARSRxStompConfig: RxStompConfig = {
   // Which server?
   brokerURL: `ws://${window.location.hostname}:8080/ws/websocket`,
 
+  connectHeaders: {
+    login: 'guest',
+    password: 'guest'
+  },
+
   // How often to heartbeat?
   // Interval in milliseconds, set to 0 to disable
   heartbeatIncoming: 0, // Typical value 0 - disabled
diff --git a/src/app/services/http/authentication.service.ts b/src/app/services/http/authentication.service.ts
index 2ac6be1b6..32896f412 100644
--- a/src/app/services/http/authentication.service.ts
+++ b/src/app/services/http/authentication.service.ts
@@ -130,4 +130,8 @@ export class AuthenticationService {
   get watchUser() {
     return this.user.asObservable();
   }
+
+  getUserAsSubject(): BehaviorSubject<User> {
+    return this.user;
+  }
 }
diff --git a/src/app/services/websockets/ws-comment-service.service.ts b/src/app/services/websockets/ws-comment-service.service.ts
index 7384404df..ffc16dc42 100644
--- a/src/app/services/websockets/ws-comment-service.service.ts
+++ b/src/app/services/websockets/ws-comment-service.service.ts
@@ -1,11 +1,13 @@
 import { Injectable } from '@angular/core';
 import { Comment } from '../../models/comment';
-import { RxStompService } from '@stomp/ng2-stompjs';
+import { WsConnectorService } from '../../services/websockets/ws-connector.service';
 import { CreateComment } from '../../models/messages/create-comment';
 import { PatchComment } from '../../models/messages/patch-comment';
 import { TSMap } from 'typescript-map';
 import { UpVote } from '../../models/messages/up-vote';
 import { DownVote } from '../../models/messages/down-vote';
+import { Observable } from 'rxjs';
+import { IMessage } from '@stomp/stompjs';
 
 
 @Injectable({
@@ -13,17 +15,11 @@ import { DownVote } from '../../models/messages/down-vote';
 })
 export class WsCommentServiceService {
 
-  constructor(private rxStompService: RxStompService) { }
+  constructor(private wsConnector: WsConnectorService) { }
 
   add(comment: Comment): void {
     const message = new CreateComment(comment.roomId, comment.userId, comment.body);
-    this.rxStompService.publish({
-      destination: `/queue/comment.command.create`,
-      body: JSON.stringify(message),
-      headers: {
-        'content-type': 'application/json'
-      }
-    });
+    this.wsConnector.send(`/queue/comment.command.create`, JSON.stringify(message));
   }
 
   toggleRead(comment: Comment): Comment {
@@ -53,34 +49,20 @@ export class WsCommentServiceService {
 
   voteUp(comment: Comment): void {
     const message = new UpVote(comment.userId, comment.id);
-    this.rxStompService.publish({
-      destination: `/queue/vote.command.upvote`,
-      body: JSON.stringify(message),
-      headers: {
-        'content-type': 'application/json'
-      }
-    });
+    this.wsConnector.send(`/queue/vote.command.upvote`, JSON.stringify(message));
   }
 
   voteDown(comment: Comment): void {
     const message = new DownVote(comment.userId, comment.id);
-    this.rxStompService.publish({
-      destination: `/queue/vote.command.downvote`,
-      body: JSON.stringify(message),
-      headers: {
-        'content-type': 'application/json'
-      }
-    });
+    this.wsConnector.send(`/queue/vote.command.downvote`, JSON.stringify(message));
   }
 
   private patchComment(comment: Comment, changes: TSMap<string, any>): void {
     const message = new PatchComment(comment.id, changes);
-      this.rxStompService.publish({
-        destination: `/queue/comment.command.patch`,
-        body: JSON.stringify(message),
-        headers: {
-          'content-type': 'application/json'
-        }
-      });
+    this.wsConnector.send(`/queue/comment.command.patch`, JSON.stringify(message));
+  }
+
+  getCommentStream(roomId: string): Observable<IMessage> {
+    return this.wsConnector.getWatcher(`/topic/${roomId}.comment.stream`);
   }
 }
diff --git a/src/app/services/websockets/ws-connector.service.ts b/src/app/services/websockets/ws-connector.service.ts
new file mode 100644
index 000000000..32007e8a9
--- /dev/null
+++ b/src/app/services/websockets/ws-connector.service.ts
@@ -0,0 +1,58 @@
+import { Injectable } from '@angular/core';
+import { RxStomp } from '@stomp/rx-stomp';
+import { AuthenticationService } from '../http/authentication.service';
+import { User } from '../../models/user';
+import { ARSRxStompConfig } from '../../rx-stomp.config';
+import { Observable } from 'rxjs';
+import { IMessage, StompHeaders } from '@stomp/stompjs';
+
+@Injectable({
+  providedIn: 'root'
+})
+export class WsConnectorService {
+  private client: RxStomp;
+
+  private headers = {
+    'content-type': 'application/json',
+    'ars-user-id': ''
+  };
+
+  constructor(
+    private authService: AuthenticationService
+  ) {
+    this.client = new RxStomp();
+    const userSubject = authService.getUserAsSubject();
+    userSubject.subscribe((user: User) => {
+      if (this.client.connected) {
+        this.client.deactivate();
+      }
+
+      if (user && user.id) {
+        const copiedConf = ARSRxStompConfig;
+        copiedConf.connectHeaders.token = user.token;
+        this.headers = {
+          'content-type': 'application/json',
+          'ars-user-id': '' + user.id
+        };
+        this.client.configure(copiedConf);
+        this.client.activate();
+      }
+    });
+  }
+
+  public send(destination: string, body: string): void {
+    if (this.client.connected) {
+      this.client.publish({
+        destination: destination,
+        body: body,
+        headers: this.headers
+      });
+    }
+  }
+
+  public getWatcher(topic: string): Observable<IMessage> {
+    if (this.client.connected) {
+      return this.client.watch(topic, this.headers);
+    }
+  }
+}
diff --git a/src/app/services/websockets/ws-feedback.service.ts b/src/app/services/websockets/ws-feedback.service.ts
index 68bbb9d02..668095d1c 100644
--- a/src/app/services/websockets/ws-feedback.service.ts
+++ b/src/app/services/websockets/ws-feedback.service.ts
@@ -1,5 +1,5 @@
 import { Injectable } from '@angular/core';
-import { RxStompService } from '@stomp/ng2-stompjs';
+import { WsConnectorService } from '../../services/websockets/ws-connector.service';
 import { CreateFeedback } from '../../models/messages/create-feedback';
 import { GetFeedback } from '../../models/messages/get-feedback';
 
@@ -7,22 +7,16 @@ import { GetFeedback } from '../../models/messages/get-feedback';
   providedIn: 'root'
 })
 export class WsFeedbackService {
-  constructor(private rxStompService: RxStompService) {}
+  constructor(private wsConnector: WsConnectorService) {}
 
   send(feedback: number, roomId: string) {
     const createFeedback = new CreateFeedback(feedback);
-    this.rxStompService.publish({
-      destination: `/backend/queue/${roomId}.feedback.command`,
-      body: JSON.stringify(createFeedback)
-    });
+    this.wsConnector.send(`/backend/queue/${roomId}.feedback.command`, JSON.stringify(createFeedback));
   }
 
   get(roomId: string) {
     const getFeedback = new GetFeedback();
 
-    this.rxStompService.publish({
-      destination: `/backend/queue/${roomId}.feedback.query`,
-      body: JSON.stringify(getFeedback)
-    });
+    this.wsConnector.send(`/backend/queue/${roomId}.feedback.query`, JSON.stringify(getFeedback));
   }
 }
-- 
GitLab