diff options
author | adamw <adam@warski.org> | 2017-07-24 16:57:51 +0200 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-07-24 16:57:51 +0200 |
commit | b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6 (patch) | |
tree | 6ef95abd69930cabb5b7566507af6dc56d25ebaf /async-http-client-handler/monix/src | |
parent | 95fee5083274bf0e856af8b868702f8965b92f1a (diff) | |
download | sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.gz sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.bz2 sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.zip |
Adding streaming to the monix async http client handler
Diffstat (limited to 'async-http-client-handler/monix/src')
-rw-r--r-- | async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala | 49 |
1 files changed, 41 insertions, 8 deletions
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala index fab9c98..c77e6d9 100644 --- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala +++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala @@ -1,32 +1,65 @@ package com.softwaremill.sttp.asynchttpclient.monix +import java.nio.ByteBuffer + import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, MonadAsyncError } import monix.eval.Task -import monix.execution.Cancelable +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.Observable import org.asynchttpclient.{ AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient } +import org.reactivestreams.Publisher import scala.util.{Failure, Success} -class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) { +class MonixAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)( + implicit scheduler: Scheduler) + extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]]( + asyncHttpClient, + TaskMonad) { + + override protected def streamBodyToPublisher( + s: Observable[ByteBuffer]): Publisher[ByteBuffer] = { + s.toReactivePublisher + } - def this() = this(new DefaultAsyncHttpClient()) - def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Observable[ByteBuffer] = + Observable.fromReactivePublisher(p) } object MonixAsyncHttpClientHandler { - def apply(): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def apply()(implicit s: Scheduler = Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient()) - def usingConfig(cfg: AsyncHttpClientConfig): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler = + Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient()) - def usingClient(client: AsyncHttpClient): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingClient(client: AsyncHttpClient)(implicit s: Scheduler = + Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(client) } |