aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server/src/LiveMessages.scala11
-rw-r--r--server/src/Main.scala37
-rw-r--r--server/src/Repository.scala1
-rw-r--r--server/src/Routes.scala7
4 files changed, 41 insertions, 15 deletions
diff --git a/server/src/LiveMessages.scala b/server/src/LiveMessages.scala
index b2d27c1..307dd81 100644
--- a/server/src/LiveMessages.scala
+++ b/server/src/LiveMessages.scala
@@ -7,8 +7,10 @@ import akka.stream.scaladsl.{
Source,
SourceQueueWithComplete
}
-import akka.stream.{Materializer, OverflowStrategy}
+import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
+import scala.concurrent.Future
+/** A very basic streaming message router. */
class LiveMessages(implicit materializer: Materializer) {
private val (
@@ -19,7 +21,10 @@ class LiveMessages(implicit materializer: Materializer) {
.toMat(BroadcastHub.sink[Message])(Keep.both)
.run()
- def push(message: Message) = in.offer(message)
- def feed = out
+ /** Push a single message into the stream. */
+ def push(message: Message): Future[QueueOfferResult] = in.offer(message)
+
+ /** Obtain a stream of the message feed. */
+ def feed: Source[Message, NotUsed] = out
}
diff --git a/server/src/Main.scala b/server/src/Main.scala
index 51e4f28..91649a2 100644
--- a/server/src/Main.scala
+++ b/server/src/Main.scala
@@ -11,18 +11,39 @@ import scala.concurrent.duration._
object Main extends App {
+ def log(message: String) = System.err.println(message)
+
+ log("Initializing contexts...")
implicit val system = ActorSystem("triad")
implicit val materializer = ActorMaterializer()
-
- val repository = {
- Files.deleteIfExists(Paths.get("database.sqlite"))
- Repository.sqlite("database.sqlite")
+ system.registerOnTermination {
+ log("Bye!")
}
- val liveMessages = new LiveMessages
- val routes = new Routes(repository, liveMessages)
- Await.result(repository.database.run(repository.initAction), 10.seconds)
+ try {
+ log("Initializing database...")
+ val repository = {
+ Files.deleteIfExists(Paths.get("database.sqlite"))
+ Repository.sqlite("database.sqlite")
+ }
+
+ log("Preparing live message relay...")
+ val liveMessages = new LiveMessages
+
+ log("Setting up routes...")
+ val routes = new Routes(repository, liveMessages)
- Await.result(Http().bindAndHandle(routes.all, "0.0.0.0", 9090), 10.seconds)
+ log("Populating database tables...")
+ Await.result(repository.database.run(repository.initAction), 10.seconds)
+
+ log("Binding to network...")
+ Await.result(Http().bindAndHandle(routes.all, "0.0.0.0", 9090), 10.seconds)
+
+ log("Ready")
+ } catch {
+ case ex: Exception =>
+ log("Error in initialization. Shutting down...")
+ system.terminate()
+ }
}
diff --git a/server/src/Repository.scala b/server/src/Repository.scala
index 003ac92..fe7ecab 100644
--- a/server/src/Repository.scala
+++ b/server/src/Repository.scala
@@ -4,6 +4,7 @@ import java.time.Instant
import slick.jdbc.{JdbcProfile, SQLiteProfile}
+/** Slick wrapper around the persisted message database. */
class Repository(val profile: JdbcProfile, url: String, driver: String) {
val database: profile.backend.DatabaseDef =
profile.api.Database.forURL(url, driver)
diff --git a/server/src/Routes.scala b/server/src/Routes.scala
index 0b08a11..d1bbefd 100644
--- a/server/src/Routes.scala
+++ b/server/src/Routes.scala
@@ -25,12 +25,11 @@ class Routes(repository: Repository, liveMessages: LiveMessages) {
private val lastMessages = repository.Messages.take(100).result
- private val messageStream: Source[Message, _] = {
- val publisher = repository.database.stream(lastMessages)
+ // stream persisted messages first, followed by live ones
+ private val messageStream: Source[Message, _] =
Source
- .fromPublisher(publisher)
+ .fromPublisher(repository.database.stream(lastMessages))
.concat(liveMessages.feed)
- }
val messages = path("messages") {
get {