...
 
Commits (2)
package main
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import utils.{MigrationConfig, Config}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
object Main extends App with Config with MigrationConfig with Routes with TestData {
private implicit val system = ActorSystem()
protected implicit val executor: ExecutionContext = system.dispatcher
protected val log: LoggingAdapter = Logging(system, getClass)
protected implicit val materializer: ActorMaterializer = ActorMaterializer()
import Shared._
//migrate()
//reloadSchema()
//populateDB
Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort)
//Await.ready(system.terminate(), 5.seconds)
}
package main
import akka.http.scaladsl.server.Directives._
import api._
......
package main
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import scala.concurrent.ExecutionContext
object Shared {
implicit val actorSystem = ActorSystem()
implicit val executor: ExecutionContext = actorSystem.dispatcher
val log: LoggingAdapter = Logging(actorSystem, getClass)
implicit val materializer: ActorMaterializer = ActorMaterializer()
}
package main
import models._
import services._
......
package api
import scala.concurrent.ExecutionContext.Implicits.global
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import models._
import akka.http.scaladsl.server.Directives._
import spray.json._
trait FeedbackApi {
import main.Shared._
val feedbackApi = pathPrefix("feedback") {
pathEndOrSingleSlash {
parameter("session".as[SessionId]) { sessionId =>
handleWebSocketMessages(FeedbackWrapper.findOrCreate(sessionId).websocketFlow())
}
}
}
}
package models
import akka.actor._
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.OverflowStrategy
import akka.stream._
import akka.stream.scaladsl._
import GraphDSL.Implicits
import akka.NotUsed
class Feedback(sessionId: SessionId, implicit val actorSystem: ActorSystem) {
private[this] val feedbackActor = actorSystem.actorOf(Props(classOf[FeedbackActor], sessionId))
def websocketFlow(): Flow[Message, Message, _] =
Flow[Message, Message, NotUsed](Source.actorRef[FeedbackMessage](bufferSize = 5, OverflowStrategy.fail)) { implicit builder =>
feedbackSource =>
val fromWebsocket = builder.add(
Flow[Message].collect {
case TextMessage.Strict(txt) => IncomingFeedback(txt.toInt)
}
)
val backToWebsocket = builder.add(
Flow[FeedbackMessage].map {
case FeedbackMessage(fb) => TextMessage(fb.toString)
}
)
val feedbackActorSink = Sink.actorRef[FeedbackEvent](feedbackActor, UserLeft)
val merge = builder.add(Merge[FeedbackEvent](2))
val actorAsSource = builder.materializedValue.map(actor => UserJoined(actor))
fromWebsocket ~> merge.in(0)
actorAsSource -> merge.in(1)
merge ~> feedbackActorSink
feedbackSource ~> backToWebsocket
(fromWebsocket.inlet, backToWebsocket.outlet)
}
def sendMessage(msg: FeedbackMessage): Unit = feedbackActor ! msg
}
object Feedback {
def apply(sessionId: SessionId)(implicit actorSystem: ActorSystem): Feedback =
new Feedback(sessionId, actorSystem)
}
object FeedbackWrapper {
var feedbackSessionMap: Map[SessionId, Feedback] = Map.empty[SessionId, Feedback]
def findOrCreate(sessionId: SessionId)(implicit actorSystem: ActorSystem): Feedback =
feedbackSessionMap.getOrElse(sessionId, createFeedbackForSession(sessionId))
private def createFeedbackForSession(sessionId: SessionId)(implicit actorSystem: ActorSystem): Feedback = {
val feedback = Feedback(sessionId)
feedbackSessionMap += sessionId -> feedback
feedback
}
}
class FeedbackActor(sessionId: SessionId) extends Actor {
val differentFeedbackOptions: Int = 5
var participants: Map[String, ActorRef] = Map.empty[String, ActorRef]
var feedback: Array[Int] = new Array[Int](differentFeedbackOptions)
def sendFeedback(): Unit = {
participants.values.foreach(_ ! FeedbackMessage(feedback))
}
override def receive: Receive = {
case UserJoined(actorRef: ActorRef) =>
participants += "" -> actorRef
case IncomingFeedback(n: Int) =>
feedback(n) = feedback(n) + 1
sendFeedback()
}
}
case class FeedbackMessage(feedback: Array[Int])
sealed trait FeedbackEvent
case class UserJoined(actorRef: ActorRef) extends FeedbackEvent
case class UserLeft(actorRef: ActorRef) extends FeedbackEvent
case class IncomingFeedback(feedback: Int) extends FeedbackEvent