aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala')
-rw-r--r--async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala87
1 files changed, 87 insertions, 0 deletions
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala
new file mode 100644
index 0000000..7f01f0c
--- /dev/null
+++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala
@@ -0,0 +1,87 @@
+package com.softwaremill.sttp.asynchttpclient.monix
+
+import java.nio.ByteBuffer
+
+import com.softwaremill.sttp.{MonadAsyncError, SttpHandler}
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
+import monix.eval.Task
+import monix.execution.{Cancelable, Scheduler}
+import monix.reactive.Observable
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.util.{Failure, Success}
+
+class AsyncHttpClientMonixHandler private (
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(implicit scheduler: Scheduler)
+ extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]](
+ asyncHttpClient,
+ TaskMonad,
+ closeClient) {
+
+ override protected def streamBodyToPublisher(
+ s: Observable[ByteBuffer]): Publisher[ByteBuffer] =
+ s.toReactivePublisher
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Observable[ByteBuffer] =
+ Observable.fromReactivePublisher(p)
+}
+
+object AsyncHttpClientMonixHandler {
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def apply()(implicit s: Scheduler = Scheduler.Implicits.global)
+ : SttpHandler[Task, Observable[ByteBuffer]] =
+ new AsyncHttpClientMonixHandler(new DefaultAsyncHttpClient(),
+ closeClient = true)
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : SttpHandler[Task, Observable[ByteBuffer]] =
+ new AsyncHttpClientMonixHandler(new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingClient(client: AsyncHttpClient)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : SttpHandler[Task, Observable[ByteBuffer]] =
+ new AsyncHttpClientMonixHandler(client, closeClient = false)
+}
+
+private[monix] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { (_, cb) =>
+ register {
+ case Left(t) => cb(Failure(t))
+ case Right(t) => cb(Success(t))
+ }
+
+ Cancelable.empty
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
+}