aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2018-09-10 15:10:09 -0700
committerGitHub <noreply@github.com>2018-09-10 15:10:09 -0700
commitf07bee564b11ee76fd065ec849a888bcf4e74e85 (patch)
tree538330c7592ccd4f555a07bf03aa91cace403be5 /src/main/scala/xyz/driver/core/messaging/StreamBus.scala
parent93ffc0a5c4b54a8beea10c3fd68e7a2d70c4c771 (diff)
downloaddriver-core-f07bee564b11ee76fd065ec849a888bcf4e74e85.tar.gz
driver-core-f07bee564b11ee76fd065ec849a888bcf4e74e85.tar.bz2
driver-core-f07bee564b11ee76fd065ec849a888bcf4e74e85.zip
Various message bus fixes (#212)v1.14.0
1. Move to pure mixin-based ("stackable traits") pattern. 2. Provide a "CreateOnDemand" mixin that ensures topics and subscriptions have been created before they are used.
Diffstat (limited to 'src/main/scala/xyz/driver/core/messaging/StreamBus.scala')
-rw-r--r--src/main/scala/xyz/driver/core/messaging/StreamBus.scala13
1 files changed, 7 insertions, 6 deletions
diff --git a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
index aa99960..aabd3dc 100644
--- a/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
+++ b/src/main/scala/xyz/driver/core/messaging/StreamBus.scala
@@ -1,10 +1,10 @@
package xyz.driver.core
package messaging
+import akka.NotUsed
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.collection.mutable.ListBuffer
-import scala.concurrent.ExecutionContext
/** An extension to message buses that offers an Akka-Streams API.
*
@@ -23,11 +23,10 @@ import scala.concurrent.ExecutionContext
* .run()
* }}} */
trait StreamBus extends Bus {
- implicit def executionContext: ExecutionContext
/** Flow that publishes any messages to a given topic.
- * Emits messages once have been published to the underlying bus. */
- def publish[A](topic: Topic[A]): Flow[A, A, _] = {
+ * Emits messages once they have been published to the underlying bus. */
+ def publish[A](topic: Topic[A]): Flow[A, A, NotUsed] = {
Flow[A]
.batch(defaultMaxMessages.toLong, a => ListBuffer[A](a))(_ += _)
.mapAsync(1) { a =>
@@ -37,7 +36,7 @@ trait StreamBus extends Bus {
}
/** Sink that acknowledges the receipt of a message. */
- def acknowledge: Sink[MessageId, _] = {
+ def acknowledge: Sink[MessageId, NotUsed] = {
Flow[MessageId]
.batch(defaultMaxMessages.toLong, a => ListBuffer[MessageId](a))(_ += _)
.mapAsync(1)(acknowledgeMessages(_))
@@ -45,7 +44,9 @@ trait StreamBus extends Bus {
}
/** Source that listens to a subscription and receives any messages sent to its topic. */
- def subscribe[A](topic: Topic[A], config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], _] = {
+ def subscribe[A](
+ topic: Topic[A],
+ config: SubscriptionConfig = defaultSubscriptionConfig): Source[Message[A], NotUsed] = {
Source
.unfoldAsync((topic, config))(
topicAndConfig =>