From 3de6739b8b6f04956c8908bc9717990807254a6b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Fri, 11 Oct 2019 20:38:55 -0400 Subject: add logging and some comments --- server/src/LiveMessages.scala | 11 ++++++++--- server/src/Main.scala | 37 +++++++++++++++++++++++++++++-------- server/src/Repository.scala | 1 + server/src/Routes.scala | 7 +++---- 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/server/src/LiveMessages.scala b/server/src/LiveMessages.scala index b2d27c1..307dd81 100644 --- a/server/src/LiveMessages.scala +++ b/server/src/LiveMessages.scala @@ -7,8 +7,10 @@ import akka.stream.scaladsl.{ Source, SourceQueueWithComplete } -import akka.stream.{Materializer, OverflowStrategy} +import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} +import scala.concurrent.Future +/** A very basic streaming message router. */ class LiveMessages(implicit materializer: Materializer) { private val ( @@ -19,7 +21,10 @@ class LiveMessages(implicit materializer: Materializer) { .toMat(BroadcastHub.sink[Message])(Keep.both) .run() - def push(message: Message) = in.offer(message) - def feed = out + /** Push a single message into the stream. */ + def push(message: Message): Future[QueueOfferResult] = in.offer(message) + + /** Obtain a stream of the message feed. */ + def feed: Source[Message, NotUsed] = out } diff --git a/server/src/Main.scala b/server/src/Main.scala index 51e4f28..91649a2 100644 --- a/server/src/Main.scala +++ b/server/src/Main.scala @@ -11,18 +11,39 @@ import scala.concurrent.duration._ object Main extends App { + def log(message: String) = System.err.println(message) + + log("Initializing contexts...") implicit val system = ActorSystem("triad") implicit val materializer = ActorMaterializer() - - val repository = { - Files.deleteIfExists(Paths.get("database.sqlite")) - Repository.sqlite("database.sqlite") + system.registerOnTermination { + log("Bye!") } - val liveMessages = new LiveMessages - val routes = new Routes(repository, liveMessages) - Await.result(repository.database.run(repository.initAction), 10.seconds) + try { + log("Initializing database...") + val repository = { + Files.deleteIfExists(Paths.get("database.sqlite")) + Repository.sqlite("database.sqlite") + } + + log("Preparing live message relay...") + val liveMessages = new LiveMessages + + log("Setting up routes...") + val routes = new Routes(repository, liveMessages) - Await.result(Http().bindAndHandle(routes.all, "0.0.0.0", 9090), 10.seconds) + log("Populating database tables...") + Await.result(repository.database.run(repository.initAction), 10.seconds) + + log("Binding to network...") + Await.result(Http().bindAndHandle(routes.all, "0.0.0.0", 9090), 10.seconds) + + log("Ready") + } catch { + case ex: Exception => + log("Error in initialization. Shutting down...") + system.terminate() + } } diff --git a/server/src/Repository.scala b/server/src/Repository.scala index 003ac92..fe7ecab 100644 --- a/server/src/Repository.scala +++ b/server/src/Repository.scala @@ -4,6 +4,7 @@ import java.time.Instant import slick.jdbc.{JdbcProfile, SQLiteProfile} +/** Slick wrapper around the persisted message database. */ class Repository(val profile: JdbcProfile, url: String, driver: String) { val database: profile.backend.DatabaseDef = profile.api.Database.forURL(url, driver) diff --git a/server/src/Routes.scala b/server/src/Routes.scala index 0b08a11..d1bbefd 100644 --- a/server/src/Routes.scala +++ b/server/src/Routes.scala @@ -25,12 +25,11 @@ class Routes(repository: Repository, liveMessages: LiveMessages) { private val lastMessages = repository.Messages.take(100).result - private val messageStream: Source[Message, _] = { - val publisher = repository.database.stream(lastMessages) + // stream persisted messages first, followed by live ones + private val messageStream: Source[Message, _] = Source - .fromPublisher(publisher) + .fromPublisher(repository.database.stream(lastMessages)) .concat(liveMessages.feed) - } val messages = path("messages") { get { -- cgit v1.2.3