aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-handler/monix/src/main
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-07-24 16:57:51 +0200
committeradamw <adam@warski.org>2017-07-24 16:57:51 +0200
commitb1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6 (patch)
tree6ef95abd69930cabb5b7566507af6dc56d25ebaf /async-http-client-handler/monix/src/main
parent95fee5083274bf0e856af8b868702f8965b92f1a (diff)
downloadsttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.gz
sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.bz2
sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.zip
Adding streaming to the monix async http client handler
Diffstat (limited to 'async-http-client-handler/monix/src/main')
-rw-r--r--async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala49
1 files changed, 41 insertions, 8 deletions
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
index fab9c98..c77e6d9 100644
--- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
@@ -1,32 +1,65 @@
package com.softwaremill.sttp.asynchttpclient.monix
+import java.nio.ByteBuffer
+
import com.softwaremill.sttp.asynchttpclient.{
AsyncHttpClientHandler,
MonadAsyncError
}
import monix.eval.Task
-import monix.execution.Cancelable
+import monix.execution.{Cancelable, Scheduler}
+import monix.reactive.Observable
import org.asynchttpclient.{
AsyncHttpClient,
AsyncHttpClientConfig,
DefaultAsyncHttpClient
}
+import org.reactivestreams.Publisher
import scala.util.{Failure, Success}
-class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)
- extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) {
+class MonixAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)(
+ implicit scheduler: Scheduler)
+ extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]](
+ asyncHttpClient,
+ TaskMonad) {
+
+ override protected def streamBodyToPublisher(
+ s: Observable[ByteBuffer]): Publisher[ByteBuffer] = {
+ s.toReactivePublisher
+ }
- def this() = this(new DefaultAsyncHttpClient())
- def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg))
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Observable[ByteBuffer] =
+ Observable.fromReactivePublisher(p)
}
object MonixAsyncHttpClientHandler {
- def apply(): MonixAsyncHttpClientHandler =
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def apply()(implicit s: Scheduler = Scheduler.Implicits.global)
+ : MonixAsyncHttpClientHandler =
new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient())
- def usingConfig(cfg: AsyncHttpClientConfig): MonixAsyncHttpClientHandler =
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : MonixAsyncHttpClientHandler =
new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient())
- def usingClient(client: AsyncHttpClient): MonixAsyncHttpClientHandler =
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingClient(client: AsyncHttpClient)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : MonixAsyncHttpClientHandler =
new MonixAsyncHttpClientHandler(client)
}