Commit df6c0b75 authored by Tom Käsler's avatar Tom Käsler

stash

parent 2cc50590
Pipeline #6782 failed with stage
in 1 minute and 9 seconds
package main
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
......@@ -10,10 +12,7 @@ import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
object Main extends App with Config with MigrationConfig with Routes with TestData {
private implicit val system = ActorSystem()
protected implicit val executor: ExecutionContext = system.dispatcher
protected val log: LoggingAdapter = Logging(system, getClass)
protected implicit val materializer: ActorMaterializer = ActorMaterializer()
import Shared._
//migrate()
//reloadSchema()
......
package main
import akka.http.scaladsl.server.Directives._
import api._
......
package main
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import scala.concurrent.ExecutionContext
object Shared {
implicit val actorSystem = ActorSystem()
implicit val executor: ExecutionContext = actorSystem.dispatcher
val log: LoggingAdapter = Logging(actorSystem, getClass)
implicit val materializer: ActorMaterializer = ActorMaterializer()
}
package main
import models._
import services._
......
......@@ -7,6 +7,7 @@ import akka.http.scaladsl.server.Directives._
import spray.json._
trait FeedbackApi {
import main.Shared._
val feedbackApi = pathPrefix("feedback") {
pathEndOrSingleSlash {
parameter("session".as[SessionId]) { sessionId =>
......
package models
import akka.actor._
import akka.stream.scaladsl.Flow
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.OverflowStrategy
import akka.stream._
import akka.stream.scaladsl._
import GraphDSL.Implicits
import akka.NotUsed
class Feedback(sessionId: SessionId, actorSystem: ActorSystem) {
class Feedback(sessionId: SessionId, implicit val actorSystem: ActorSystem) {
private[this] val feedbackActor = actorSystem.actorOf(Props(classOf[FeedbackActor], sessionId))
def websocketFlow(): Flow[Message, Message, Any] =
Flow(Source.actorRef[FeedbackMessage](500, OverflowStrategy.fail)) { implicit builder =>
chatSource =>
def websocketFlow(): Flow[Message, Message, _] =
Flow[Message, Message, NotUsed](Source.actorRef[FeedbackMessage](bufferSize = 5, OverflowStrategy.fail)) { implicit builder =>
feedbackSource =>
val fromWebsocket = builder.add(
Flow[Message].collect {
case TextMessage.Strict(txt) => IncomingFeedback(txt.toInt)
......@@ -24,10 +26,24 @@ class Feedback(sessionId: SessionId, actorSystem: ActorSystem) {
}
)
val feedbackActorSink = Sink.actorRef[FeedbackEvent](feedbackActor, UserLeft)
val merge = builder.add(Merge[FeedbackEvent](2))
val actorAsSource = builder.materializedValue.map(actor => UserJoined(actor))
fromWebsocket ~> merge.in(0)
actorAsSource -> merge.in(1)
merge ~> feedbackActorSink
feedbackSource ~> backToWebsocket
(fromWebsocket.inlet, backToWebsocket.outlet)
}
//def sendMessage(msg: )
def sendMessage(msg: FeedbackMessage): Unit = feedbackActor ! msg
}
object Feedback {
......@@ -60,6 +76,9 @@ class FeedbackActor(sessionId: SessionId) extends Actor {
override def receive: Receive = {
case UserJoined(actorRef: ActorRef) =>
participants += "" -> actorRef
case IncomingFeedback(n: Int) =>
feedback(n) = feedback(n) + 1
sendFeedback()
}
}
......@@ -69,4 +88,6 @@ sealed trait FeedbackEvent
case class UserJoined(actorRef: ActorRef) extends FeedbackEvent
case class UserLeft(actorRef: ActorRef) extends FeedbackEvent
case class IncomingFeedback(feedback: Int) extends FeedbackEvent
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment