From b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6 Mon Sep 17 00:00:00 2001 From: adamw Date: Mon, 24 Jul 2017 16:57:51 +0200 Subject: Adding streaming to the monix async http client handler --- .../monix/MonixAsyncHttpClientHandler.scala | 49 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 8 deletions(-) (limited to 'async-http-client-handler/monix/src/main') diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala index fab9c98..c77e6d9 100644 --- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala +++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala @@ -1,32 +1,65 @@ package com.softwaremill.sttp.asynchttpclient.monix +import java.nio.ByteBuffer + import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, MonadAsyncError } import monix.eval.Task -import monix.execution.Cancelable +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.Observable import org.asynchttpclient.{ AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient } +import org.reactivestreams.Publisher import scala.util.{Failure, Success} -class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) { +class MonixAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)( + implicit scheduler: Scheduler) + extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]]( + asyncHttpClient, + TaskMonad) { + + override protected def streamBodyToPublisher( + s: Observable[ByteBuffer]): Publisher[ByteBuffer] = { + s.toReactivePublisher + } - def this() = this(new DefaultAsyncHttpClient()) - def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Observable[ByteBuffer] = + Observable.fromReactivePublisher(p) } object MonixAsyncHttpClientHandler { - def apply(): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def apply()(implicit s: Scheduler = Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient()) - def usingConfig(cfg: AsyncHttpClientConfig): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler = + Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient()) - def usingClient(client: AsyncHttpClient): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingClient(client: AsyncHttpClient)(implicit s: Scheduler = + Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(client) } -- cgit v1.2.3