diff options
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r-- | src/main/scala/xyz/driver/core/messaging/StreamBus.scala | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala index aa99960..aabd3dc 100644 --- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala +++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala @@ -1,10 +1,10 @@ package xyz.driver.core package messaging +import akka.NotUsed import akka.stream.scaladsl.{Flow, Sink, Source} import scala.collection.mutable.ListBuffer -import scala.concurrent.ExecutionContext /** An extension to message buses that offers an Akka-Streams API. * @@ -23,11 +23,10 @@ import scala.concurrent.ExecutionContext * .run() * }}} */ trait StreamBus extends Bus { - implicit def executionContext: ExecutionContext /** Flow that publishes any messages to a given topic. - * Emits messages once have been published to the underlying bus. */ - def publish[A](topic: Topic[A]): Flow[A, A, _] = { + * Emits messages once they have been published to the underlying bus. */ + def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = { Flow[A] .batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _) .mapAsync(1) { a => @@ -37,7 +36,7 @@ trait StreamBus extends Bus { } /** Sink that acknowledges the receipt of a message. */ - def acknowledge: Sink[MessageId, _] = { + def acknowledge: Sink[MessageId, NotUsed] = { Flow[MessageId] .batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _) .mapAsync(1)(acknowledgeMessages(_)) @@ -45,7 +44,9 @@ trait StreamBus extends Bus { } /** Source that listens to a subscription and receives any messages sent to its topic. */ - def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = { + def subscribe[A]( + topic: Topic[A], + config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = { Source .unfoldAsync((topic, config))( topicAndConfig => |