aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala13
1 files changed, 7 insertions, 6 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
index aa99960..aabd3dc 100644
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -1,10 +1,10 @@
package xyz.driver.core
package messaging
+import akka.NotUsed
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.collection.mutable.ListBuffer
-import scala.concurrent.ExecutionContext
/** An extension to message buses that offers an Akka-Streams API.
*
@@ -23,11 +23,10 @@ import scala.concurrent.ExecutionContext
* .run()
* }}} */
trait StreamBus extends Bus {
- implicit def executionContext: ExecutionContext
/** Flow that publishes any messages to a given topic.
- * Emits messages once have been published to the underlying bus. */
- def publish[A](topic: Topic[A]): Flow[A, A, _] = {
+ * Emits messages once they have been published to the underlying bus. */
+ def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
Flow[A]
.batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
.mapAsync(1) { a =>
@@ -37,7 +36,7 @@ trait StreamBus extends Bus {
}
/** Sink that acknowledges the receipt of a message. */
- def acknowledge: Sink[MessageId, _] = {
+ def acknowledge: Sink[MessageId, NotUsed] = {
Flow[MessageId]
.batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
.mapAsync(1)(acknowledgeMessages(_))
@@ -45,7 +44,9 @@ trait StreamBus extends Bus {
}
/** Source that listens to a subscription and receives any messages sent to its topic. */
- def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = {
+ def subscribe[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
Source
.unfoldAsync((topic, config))(
topicAndConfig =>