From f07bee564b11ee76fd065ec849a888bcf4e74e85 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 10 Sep 2018 15:10:09 -0700 Subject: Various message bus fixes (#212) 1. Move to pure mixin-based ("stackable traits") pattern. 2. Provide a "CreateOnDemand" mixin that ensures topics and subscriptions have been created before they are used. --- src/main/scala/xyz/driver/core/messaging/StreamBus.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala') diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala index aa99960..aabd3dc 100644 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -1,10 +1,10 @@ package xyz.driver.core package messaging +import akka.NotUsed import akka.stream.scaladsl.{Flow, Sink, Source} import scala.collection.mutable.ListBuffer -import scala.concurrent.ExecutionContext /** An extension to message buses that offers an Akka-Streams API. * @@ -23,11 +23,10 @@ import scala.concurrent.ExecutionContext * .run() * }}} */ trait StreamBus extends Bus { - implicit def executionContext: ExecutionContext /** Flow that publishes any messages to a given topic. - * Emits messages once have been published to the underlying bus. */ - def publish[A](topic: Topic[A]): Flow[A, A, _] = { + * Emits messages once they have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { Flow[A] .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) .mapAsync(1) { a => @@ -37,7 +36,7 @@ trait StreamBus extends Bus { } /** Sink that acknowledges the receipt of a message. */ - def acknowledge: Sink[MessageId, _] = { + def acknowledge: Sink[MessageId, NotUsed] = { Flow[MessageId] .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) .mapAsync(1)(acknowledgeMessages(_)) @@ -45,7 +44,9 @@ trait StreamBus extends Bus { } /** Source that listens to a subscription and receives any messages sent to its topic. */ - def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = { + def subscribe[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { Source .unfoldAsync((topic, config))( topicAndConfig => -- cgit v1.2.3