From 8ecae787ff7124b008229d958c579c73649dd9e4 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 6 May 2018 13:56:16 -0700 Subject: Initial commit --- server/src/main/scala/LiveMessages.scala | 18 +++++++ server/src/main/scala/Main.scala | 28 +++++++++++ server/src/main/scala/Repository.scala | 49 +++++++++++++++++++ server/src/main/scala/Routes.scala | 80 ++++++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+) create mode 100644 server/src/main/scala/LiveMessages.scala create mode 100644 server/src/main/scala/Main.scala create mode 100644 server/src/main/scala/Repository.scala create mode 100644 server/src/main/scala/Routes.scala (limited to 'server') diff --git a/server/src/main/scala/LiveMessages.scala b/server/src/main/scala/LiveMessages.scala new file mode 100644 index 0000000..710c1dd --- /dev/null +++ b/server/src/main/scala/LiveMessages.scala @@ -0,0 +1,18 @@ +package triad + +import akka.NotUsed +import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete} +import akka.stream.{Materializer, OverflowStrategy} + +class LiveMessages(implicit materializer: Materializer) { + + private val (in: SourceQueueWithComplete[Message], + out: Source[Message, NotUsed]) = Source + .queue[Message](10, OverflowStrategy.dropTail) + .toMat(BroadcastHub.sink[Message])(Keep.both) + .run() + + def push(message: Message) = in.offer(message) + def feed = out + +} diff --git a/server/src/main/scala/Main.scala b/server/src/main/scala/Main.scala new file mode 100644 index 0000000..8db8873 --- /dev/null +++ b/server/src/main/scala/Main.scala @@ -0,0 +1,28 @@ +package triad + +import java.nio.file.{Files, Paths} + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.stream.ActorMaterializer + +import scala.concurrent._ +import scala.concurrent.duration._ + +object Main extends App { + + implicit val system = ActorSystem("triad") + implicit val materializer = ActorMaterializer() + + val repository = { + Files.deleteIfExists(Paths.get("database.sqlite")) + Repository.sqlite("database.sqlite") + } + val liveMessages = new LiveMessages + val routes = new Routes(repository, liveMessages) + + Await.result(repository.database.run(repository.initAction), 10.seconds) + + Await.result(Http().bindAndHandle(routes.all, "localhost", 9090), 10.seconds) + +} diff --git a/server/src/main/scala/Repository.scala b/server/src/main/scala/Repository.scala new file mode 100644 index 0000000..003ac92 --- /dev/null +++ b/server/src/main/scala/Repository.scala @@ -0,0 +1,49 @@ +package triad + +import java.time.Instant + +import slick.jdbc.{JdbcProfile, SQLiteProfile} + +class Repository(val profile: JdbcProfile, url: String, driver: String) { + val database: profile.backend.DatabaseDef = + profile.api.Database.forURL(url, driver) + + import profile.api._ + + implicit val instantColumnType = MappedColumnType.base[Instant, Long]( + { i => + i.toEpochMilli() + }, { l => + Instant.ofEpochMilli(l) + } + ) + + class Messages(tag: Tag) extends Table[Message](tag, "messages") { + def id = column[String]("id") + def content = column[String]("content") + def author = column[String]("author") + def timestamp = column[Instant]("timestamp") + def * = + (id, content, author, timestamp) <> ({ cols => + Message(cols._2, cols._3, cols._4) + }, { message: Message => + Some((message.id, message.content, message.author, message.timestamp)) + }) + def pk = primaryKey("pk", id) + } + + val Messages = TableQuery[Messages] + + def initAction = DBIO.seq( + Messages.schema.create, + Messages += Message("first!", "John Smith") + ) + +} + +object Repository { + + def sqlite(name: String) = + new Repository(SQLiteProfile, s"jdbc:sqlite:$name", "org.sqlite.JDBC") + +} diff --git a/server/src/main/scala/Routes.scala b/server/src/main/scala/Routes.scala new file mode 100644 index 0000000..d39fa18 --- /dev/null +++ b/server/src/main/scala/Routes.scala @@ -0,0 +1,80 @@ +package triad + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ +import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller} +import akka.http.scaladsl.model.MediaTypes +import akka.http.scaladsl.model.sse.ServerSentEvent +import akka.http.scaladsl.server.Directives._ +import akka.stream.scaladsl.Source +import spray.json._ +import triad.ApiProtocol._ + +import scala.concurrent.duration._ + +class Routes(repository: Repository, liveMessages: LiveMessages) { + import repository.profile.api._ + + // allows using scalatags templates as HTTP responses + implicit val tagMarshaller: ToEntityMarshaller[scalatags.Text.Tag] = { + Marshaller.stringMarshaller(MediaTypes.`text/html`).compose { + (tag: scalatags.Text.Tag) => + tag.render + } + } + + private val lastMessages = repository.Messages.take(100).result + + private val messageStream: Source[Message, _] = { + val publisher = repository.database.stream(lastMessages) + Source + .fromPublisher(publisher) + .concat(liveMessages.feed) + } + + val messages = path("messages") { + get { + onSuccess(repository.database.run(lastMessages)) { messages => + complete(messages) + } + } ~ post { + entity(as[Message]) { message => + extractExecutionContext { implicit ec => + val query = repository.Messages.insertOrUpdate(message) + val action = repository.database.run(query).flatMap { _ => + liveMessages.push(message) + } + onSuccess(action) { _ => + complete(message) + } + } + } + } + } + + val ui = pathEndOrSingleSlash { + get { + parameter("js".as[Boolean] ? true) { js => + onSuccess(repository.database.run(lastMessages)) { messages => + complete(TextTemplates.page(messages, js)) + } + } + } + } + + val live = path("live") { + get { + val src = messageStream + .map(msg => ServerSentEvent(msg.toJson.compactPrint)) + .keepAlive(10.seconds, () => ServerSentEvent.heartbeat) + complete(src) + } + } + + val assets = pathPrefix("assets") { + getFromResourceDirectory("assets") + } + + def all = messages ~ ui ~ live ~ assets + +} -- cgit v1.2.3