diff options
Diffstat (limited to 'async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala')
-rw-r--r-- | async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala new file mode 100644 index 0000000..7f01f0c --- /dev/null +++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala @@ -0,0 +1,87 @@ +package com.softwaremill.sttp.asynchttpclient.monix + +import java.nio.ByteBuffer + +import com.softwaremill.sttp.{MonadAsyncError, SttpHandler} +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler +import monix.eval.Task +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.Observable +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.util.{Failure, Success} + +class AsyncHttpClientMonixHandler private ( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)(implicit scheduler: Scheduler) + extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]]( + asyncHttpClient, + TaskMonad, + closeClient) { + + override protected def streamBodyToPublisher( + s: Observable[ByteBuffer]): Publisher[ByteBuffer] = + s.toReactivePublisher + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Observable[ByteBuffer] = + Observable.fromReactivePublisher(p) +} + +object AsyncHttpClientMonixHandler { + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def apply()(implicit s: Scheduler = Scheduler.Implicits.global) + : SttpHandler[Task, Observable[ByteBuffer]] = + new AsyncHttpClientMonixHandler(new DefaultAsyncHttpClient(), + closeClient = true) + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler = + Scheduler.Implicits.global) + : SttpHandler[Task, Observable[ByteBuffer]] = + new AsyncHttpClientMonixHandler(new DefaultAsyncHttpClient(cfg), + closeClient = true) + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingClient(client: AsyncHttpClient)(implicit s: Scheduler = + Scheduler.Implicits.global) + : SttpHandler[Task, Observable[ByteBuffer]] = + new AsyncHttpClientMonixHandler(client, closeClient = false) +} + +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) +} |