aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-backend
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-09-14 11:03:21 +0100
committeradamw <adam@warski.org>2017-09-14 11:03:21 +0100
commitfbc71ee712635ed64c50ca694735a84ec794eb11 (patch)
treebf1dd7335306b7f320262d45d0d5b6d02f5a0b27 /async-http-client-backend
parenta971d409cb1063a2089d936abf3d3ab70bbbabb6 (diff)
downloadsttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.gz
sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.bz2
sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.zip
Renaming "handler" to "backend"
Diffstat (limited to 'async-http-client-backend')
-rw-r--r--async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala82
-rw-r--r--async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala112
-rw-r--r--async-http-client-backend/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureBackend.scala76
-rw-r--r--async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala113
-rw-r--r--async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala79
-rw-r--r--async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala313
6 files changed, 775 insertions, 0 deletions
diff --git a/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala
new file mode 100644
index 0000000..b5beb75
--- /dev/null
+++ b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala
@@ -0,0 +1,82 @@
+package com.softwaremill.sttp.asynchttpclient.cats
+
+import java.nio.ByteBuffer
+
+import cats.effect._
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
+import com.softwaremill.sttp.{
+ FollowRedirectsBackend,
+ MonadAsyncError,
+ SttpBackend
+}
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.concurrent.duration.FiniteDuration
+import scala.language.higherKinds
+
+class AsyncHttpClientCatsBackend[F[_]: Async] private (
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean
+) extends AsyncHttpClientBackend[F, Nothing](
+ asyncHttpClient,
+ new AsyncMonad,
+ closeClient
+ ) {
+ override protected def streamBodyToPublisher(
+ s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Nothing =
+ throw new IllegalStateException("This backend does not support streaming")
+
+ override protected def publisherToString(
+ p: Publisher[ByteBuffer]): F[String] =
+ throw new IllegalStateException("This backend does not support streaming")
+}
+
+object AsyncHttpClientCatsBackend {
+
+ private def apply[F[_]: Async](
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean): SttpBackend[F, Nothing] =
+ new FollowRedirectsBackend[F, Nothing](
+ new AsyncHttpClientCatsBackend(asyncHttpClient, closeClient))
+
+ def apply[F[_]: Async](
+ connectionTimeout: FiniteDuration = SttpBackend.DefaultConnectionTimeout)
+ : SttpBackend[F, Nothing] =
+ AsyncHttpClientCatsBackend(
+ AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt),
+ closeClient = true)
+
+ def usingConfig[F[_]: Async](
+ cfg: AsyncHttpClientConfig): SttpBackend[F, Nothing] =
+ AsyncHttpClientCatsBackend(new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ def usingClient[F[_]: Async](
+ client: AsyncHttpClient): SttpBackend[F, Nothing] =
+ AsyncHttpClientCatsBackend(client, closeClient = false)
+}
+
+private[cats] class AsyncMonad[F[_]](implicit F: Async[F])
+ extends MonadAsyncError[F] {
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
+ F.async(register)
+
+ override def unit[T](t: T): F[T] = F.pure(t)
+
+ override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f)
+
+ override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] =
+ F.flatMap(fa)(f)
+
+ override def error[T](t: Throwable): F[T] = F.raiseError(t)
+}
diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
new file mode 100644
index 0000000..90db69c
--- /dev/null
+++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala
@@ -0,0 +1,112 @@
+package com.softwaremill.sttp.asynchttpclient.fs2
+
+import java.nio.ByteBuffer
+
+import cats.effect._
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
+import com.softwaremill.sttp.{
+ FollowRedirectsBackend,
+ MonadAsyncError,
+ SttpBackend,
+ Utf8,
+ concatByteBuffers
+}
+import fs2._
+import fs2.interop.reactivestreams._
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+import scala.language.higherKinds
+
+class AsyncHttpClientFs2Backend[F[_]: Effect] private (
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(implicit ec: ExecutionContext)
+ extends AsyncHttpClientBackend[F, Stream[F, ByteBuffer]](
+ asyncHttpClient,
+ new EffectMonad,
+ closeClient
+ ) {
+
+ override protected def streamBodyToPublisher(
+ s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] =
+ s.toUnicastPublisher
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] =
+ p.toStream[F]
+
+ override protected def publisherToString(
+ p: Publisher[ByteBuffer]): F[String] = {
+ val bytes = p
+ .toStream[F]
+ .runFold(ByteBuffer.allocate(0))(concatByteBuffers)
+
+ implicitly[Effect[F]].map(bytes)(bb => new String(bb.array(), Utf8))
+ }
+}
+
+object AsyncHttpClientFs2Backend {
+
+ private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(
+ implicit ec: ExecutionContext): SttpBackend[F, Stream[F, ByteBuffer]] =
+ new FollowRedirectsBackend(
+ new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient))
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def apply[F[_]: Effect](connectionTimeout: FiniteDuration =
+ SttpBackend.DefaultConnectionTimeout)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](
+ AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingClient[F[_]: Effect](client: AsyncHttpClient)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[F, Stream[F, ByteBuffer]] =
+ AsyncHttpClientFs2Backend[F](client, closeClient = false)
+}
+
+private[fs2] class EffectMonad[F[_]](implicit F: Effect[F])
+ extends MonadAsyncError[F] {
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
+ F.async(register)
+
+ override def unit[T](t: T): F[T] = F.pure(t)
+
+ override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f)
+
+ override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] =
+ F.flatMap(fa)(f)
+
+ override def error[T](t: Throwable): F[T] = F.raiseError(t)
+}
diff --git a/async-http-client-backend/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureBackend.scala b/async-http-client-backend/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureBackend.scala
new file mode 100644
index 0000000..a46ed0d
--- /dev/null
+++ b/async-http-client-backend/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureBackend.scala
@@ -0,0 +1,76 @@
+package com.softwaremill.sttp.asynchttpclient.future
+
+import java.nio.ByteBuffer
+
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
+import com.softwaremill.sttp.{FollowRedirectsBackend, FutureMonad, SttpBackend}
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsyncHttpClientFutureBackend private (
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(implicit ec: ExecutionContext)
+ extends AsyncHttpClientBackend[Future, Nothing](asyncHttpClient,
+ new FutureMonad,
+ closeClient) {
+
+ override protected def streamBodyToPublisher(
+ s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Nothing =
+ throw new IllegalStateException("This backend does not support streaming")
+
+ override protected def publisherToString(
+ p: Publisher[ByteBuffer]): Future[String] =
+ throw new IllegalStateException("This backend does not support streaming")
+}
+
+object AsyncHttpClientFutureBackend {
+
+ private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(
+ implicit ec: ExecutionContext): SttpBackend[Future, Nothing] =
+ new FollowRedirectsBackend[Future, Nothing](
+ new AsyncHttpClientFutureBackend(asyncHttpClient, closeClient))
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def apply(connectionTimeout: FiniteDuration =
+ SttpBackend.DefaultConnectionTimeout)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[Future, Nothing] =
+ AsyncHttpClientFutureBackend(
+ AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingConfig(cfg: AsyncHttpClientConfig)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpBackend[Future, Nothing] =
+ AsyncHttpClientFutureBackend(new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext =
+ ExecutionContext.Implicits.global)
+ : SttpBackend[Future, Nothing] =
+ AsyncHttpClientFutureBackend(client, closeClient = false)
+}
diff --git a/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala
new file mode 100644
index 0000000..c08c244
--- /dev/null
+++ b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala
@@ -0,0 +1,113 @@
+package com.softwaremill.sttp.asynchttpclient.monix
+
+import java.nio.ByteBuffer
+
+import com.softwaremill.sttp.{
+ FollowRedirectsBackend,
+ MonadAsyncError,
+ SttpBackend,
+ Utf8,
+ concatByteBuffers
+}
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
+import monix.eval.Task
+import monix.execution.{Cancelable, Scheduler}
+import monix.reactive.Observable
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.concurrent.duration.FiniteDuration
+import scala.util.{Failure, Success}
+
+class AsyncHttpClientMonixBackend private (
+ asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)(implicit scheduler: Scheduler)
+ extends AsyncHttpClientBackend[Task, Observable[ByteBuffer]](
+ asyncHttpClient,
+ TaskMonad,
+ closeClient) {
+
+ override protected def streamBodyToPublisher(
+ s: Observable[ByteBuffer]): Publisher[ByteBuffer] =
+ s.toReactivePublisher
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Observable[ByteBuffer] =
+ Observable.fromReactivePublisher(p)
+
+ override protected def publisherToString(
+ p: Publisher[ByteBuffer]): Task[String] = {
+
+ val bytes = Observable
+ .fromReactivePublisher(p)
+ .foldLeftL(ByteBuffer.allocate(0))(concatByteBuffers)
+
+ bytes.map(bb => new String(bb.array(), Utf8))
+ }
+}
+
+object AsyncHttpClientMonixBackend {
+
+ private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(
+ implicit scheduler: Scheduler)
+ : SttpBackend[Task, Observable[ByteBuffer]] =
+ new FollowRedirectsBackend(
+ new AsyncHttpClientMonixBackend(asyncHttpClient, closeClient))
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def apply(connectionTimeout: FiniteDuration =
+ SttpBackend.DefaultConnectionTimeout)(
+ implicit s: Scheduler = Scheduler.Implicits.global)
+ : SttpBackend[Task, Observable[ByteBuffer]] =
+ AsyncHttpClientMonixBackend(
+ AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt),
+ closeClient = true)
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : SttpBackend[Task, Observable[ByteBuffer]] =
+ AsyncHttpClientMonixBackend(new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingClient(client: AsyncHttpClient)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : SttpBackend[Task, Observable[ByteBuffer]] =
+ AsyncHttpClientMonixBackend(client, closeClient = false)
+}
+
+private[monix] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { (_, cb) =>
+ register {
+ case Left(t) => cb(Failure(t))
+ case Right(t) => cb(Success(t))
+ }
+
+ Cancelable.empty
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
+}
diff --git a/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala
new file mode 100644
index 0000000..12e217b
--- /dev/null
+++ b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala
@@ -0,0 +1,79 @@
+package com.softwaremill.sttp.asynchttpclient.scalaz
+
+import java.nio.ByteBuffer
+
+import com.softwaremill.sttp.{
+ FollowRedirectsBackend,
+ MonadAsyncError,
+ SttpBackend
+}
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend
+import org.asynchttpclient.{
+ AsyncHttpClient,
+ AsyncHttpClientConfig,
+ DefaultAsyncHttpClient
+}
+import org.reactivestreams.Publisher
+
+import scala.concurrent.duration.FiniteDuration
+import scalaz.{-\/, \/-}
+import scalaz.concurrent.Task
+
+class AsyncHttpClientScalazBackend private (asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean)
+ extends AsyncHttpClientBackend[Task, Nothing](asyncHttpClient,
+ TaskMonad,
+ closeClient) {
+
+ override protected def streamBodyToPublisher(
+ s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Nothing =
+ throw new IllegalStateException("This backend does not support streaming")
+
+ override protected def publisherToString(
+ p: Publisher[ByteBuffer]): Task[String] =
+ throw new IllegalStateException("This backend does not support streaming")
+}
+
+object AsyncHttpClientScalazBackend {
+ private def apply(asyncHttpClient: AsyncHttpClient,
+ closeClient: Boolean): SttpBackend[Task, Nothing] =
+ new FollowRedirectsBackend[Task, Nothing](
+ new AsyncHttpClientScalazBackend(asyncHttpClient, closeClient))
+
+ def apply(
+ connectionTimeout: FiniteDuration = SttpBackend.DefaultConnectionTimeout)
+ : SttpBackend[Task, Nothing] =
+ AsyncHttpClientScalazBackend(
+ AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt),
+ closeClient = true)
+
+ def usingConfig(cfg: AsyncHttpClientConfig): SttpBackend[Task, Nothing] =
+ AsyncHttpClientScalazBackend(new DefaultAsyncHttpClient(cfg),
+ closeClient = true)
+
+ def usingClient(client: AsyncHttpClient): SttpBackend[Task, Nothing] =
+ AsyncHttpClientScalazBackend(client, closeClient = false)
+}
+
+private[scalaz] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.point(t)
+
+ override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { cb =>
+ register {
+ case Left(t) => cb(-\/(t))
+ case Right(t) => cb(\/-(t))
+ }
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.fail(t)
+}
diff --git a/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala
new file mode 100644
index 0000000..87a3965
--- /dev/null
+++ b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala
@@ -0,0 +1,313 @@
+package com.softwaremill.sttp.asynchttpclient
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+
+import com.softwaremill.sttp.ResponseAs.EagerResponseHandler
+import com.softwaremill.sttp._
+import org.asynchttpclient.AsyncHandler.State
+import org.asynchttpclient.handler.StreamedAsyncHandler
+import org.asynchttpclient.request.body.multipart.{
+ ByteArrayPart,
+ FilePart,
+ StringPart
+}
+import org.asynchttpclient.{
+ AsyncCompletionHandler,
+ AsyncHandler,
+ AsyncHttpClient,
+ DefaultAsyncHttpClientConfig,
+ DefaultAsyncHttpClient,
+ HttpResponseBodyPart,
+ HttpResponseHeaders,
+ HttpResponseStatus,
+ Param,
+ RequestBuilder,
+ Request => AsyncRequest,
+ Response => AsyncResponse
+}
+import org.reactivestreams.{Publisher, Subscriber, Subscription}
+
+import scala.collection.JavaConverters._
+import scala.language.higherKinds
+import scala.util.{Failure, Try}
+
+abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient,
+ rm: MonadAsyncError[R],
+ closeClient: Boolean)
+ extends SttpBackend[R, S] {
+
+ override def send[T](r: Request[T, S]): R[Response[T]] = {
+ val preparedRequest = asyncHttpClient
+ .prepareRequest(requestToAsync(r))
+
+ rm.flatten(rm.async[R[Response[T]]] { cb =>
+ def success(r: R[Response[T]]) = cb(Right(r))
+ def error(t: Throwable) = cb(Left(t))
+
+ r.response match {
+ case ras @ ResponseAsStream() =>
+ preparedRequest
+ .execute(streamingAsyncHandler(ras, success, error))
+
+ case ra =>
+ preparedRequest
+ .execute(eagerAsyncHandler(ra, success, error))
+ }
+ })
+ }
+
+ override def responseMonad: MonadError[R] = rm
+
+ protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer]
+
+ protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S
+
+ protected def publisherToString(p: Publisher[ByteBuffer]): R[String]
+
+ private def eagerAsyncHandler[T](
+ responseAs: ResponseAs[T, S],
+ success: R[Response[T]] => Unit,
+ error: Throwable => Unit): AsyncHandler[Unit] = {
+
+ new AsyncCompletionHandler[Unit] {
+ override def onCompleted(response: AsyncResponse): Unit =
+ success(readEagerResponse(response, responseAs))
+
+ override def onThrowable(t: Throwable): Unit = error(t)
+ }
+ }
+
+ private def streamingAsyncHandler[T](
+ responseAs: ResponseAsStream[T, S],
+ success: R[Response[T]] => Unit,
+ error: Throwable => Unit): AsyncHandler[Unit] = {
+ new StreamedAsyncHandler[Unit] {
+ private val builder = new AsyncResponse.ResponseBuilder()
+ private var publisher: Option[Publisher[ByteBuffer]] = None
+ private var completed = false
+
+ override def onStream(
+ p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = {
+ // Sadly we don't have .map on Publisher
+ publisher = Some(new Publisher[ByteBuffer] {
+ override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit =
+ p.subscribe(new Subscriber[HttpResponseBodyPart] {
+ override def onError(t: Throwable): Unit = s.onError(t)
+ override def onComplete(): Unit = s.onComplete()
+ override def onNext(t: HttpResponseBodyPart): Unit =
+ s.onNext(t.getBodyByteBuffer)
+ override def onSubscribe(v: Subscription): Unit =
+ s.onSubscribe(v)
+ })
+ })
+ // #2: sometimes onCompleted() isn't called, only onStream(); this
+ // seems to be true esp for https sites. For these cases, completing
+ // the request here.
+ doComplete()
+ State.CONTINUE
+ }
+
+ override def onBodyPartReceived(
+ bodyPart: HttpResponseBodyPart): AsyncHandler.State =
+ throw new IllegalStateException(
+ "Requested a streaming backend, unexpected eager body parts.")
+
+ override def onHeadersReceived(
+ headers: HttpResponseHeaders): AsyncHandler.State = {
+ builder.accumulate(headers)
+ State.CONTINUE
+ }
+
+ override def onStatusReceived(
+ responseStatus: HttpResponseStatus): AsyncHandler.State = {
+ builder.accumulate(responseStatus)
+ State.CONTINUE
+ }
+
+ override def onCompleted(): Unit = {
+ // if the request had no body, onStream() will never be called
+ doComplete()
+ }
+
+ private def doComplete(): Unit = {
+ if (!completed) {
+ completed = true
+
+ val baseResponse = readResponseNoBody(builder.build())
+ val p = publisher.getOrElse(EmptyPublisher)
+ val s = publisherToStreamBody(p)
+ val b = if (codeIsSuccess(baseResponse.code)) {
+ rm.unit(Right(responseAs.responseIsStream(s)))
+ } else {
+ rm.map(publisherToString(p))(Left(_))
+ }
+
+ success(rm.map(b) { bb: Either[String, T] =>
+ baseResponse.copy(body = bb)
+ })
+ }
+ }
+
+ override def onThrowable(t: Throwable): Unit = {
+ error(t)
+ }
+ }
+ }
+
+ private def requestToAsync(r: Request[_, S]): AsyncRequest = {
+ val readTimeout = r.options.readTimeout
+ val rb = new RequestBuilder(r.method.m)
+ .setUrl(r.uri.toString)
+ .setRequestTimeout(
+ if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1)
+ r.headers.foreach { case (k, v) => rb.setHeader(k, v) }
+ setBody(r, r.body, rb)
+ rb.build()
+ }
+
+ private def setBody(r: Request[_, S],
+ body: RequestBody[S],
+ rb: RequestBuilder): Unit = {
+ body match {
+ case NoBody => // skip
+
+ case StringBody(b, encoding, _) =>
+ rb.setBody(b.getBytes(encoding))
+
+ case ByteArrayBody(b, _) =>
+ rb.setBody(b)
+
+ case ByteBufferBody(b, _) =>
+ rb.setBody(b)
+
+ case InputStreamBody(b, _) =>
+ rb.setBody(b)
+
+ case PathBody(b, _) =>
+ rb.setBody(b.toFile)
+
+ case StreamBody(s) =>
+ val cl = r.headers
+ .find(_._1.equalsIgnoreCase(ContentLengthHeader))
+ .map(_._2.toLong)
+ .getOrElse(-1L)
+ rb.setBody(streamBodyToPublisher(s), cl)
+
+ case MultipartBody(ps) =>
+ ps.foreach(addMultipartBody(rb, _))
+ }
+ }
+
+ private def addMultipartBody(rb: RequestBuilder, mp: Multipart): Unit = {
+ // async http client only supports setting file names on file parts. To
+ // set a file name on an arbitrary part we have to use a small "work
+ // around", combining the file name with the name (surrounding quotes
+ // are added by ahc).
+ def nameWithFilename = mp.fileName.fold(mp.name) { fn =>
+ s"""${mp.name}"; filename="$fn"""
+ }
+
+ val bodyPart = mp.body match {
+ case StringBody(b, encoding, _) =>
+ new StringPart(nameWithFilename,
+ b,
+ mp.contentType.getOrElse(TextPlainContentType),
+ Charset.forName(encoding))
+ case ByteArrayBody(b, _) =>
+ new ByteArrayPart(nameWithFilename, b)
+ case ByteBufferBody(b, _) =>
+ new ByteArrayPart(nameWithFilename, b.array())
+ case InputStreamBody(b, _) =>
+ // sadly async http client only supports parts that are strings,
+ // byte arrays or files
+ val baos = new ByteArrayOutputStream()
+ transfer(b, baos)
+ new ByteArrayPart(nameWithFilename, baos.toByteArray)
+ case PathBody(b, _) =>
+ new FilePart(mp.name, b.toFile, null, null, mp.fileName.orNull)
+ }
+
+ bodyPart.setCustomHeaders(
+ mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava)
+
+ rb.addBodyPart(bodyPart)
+ }
+
+ private def readEagerResponse[T](
+ response: AsyncResponse,
+ responseAs: ResponseAs[T, S]): R[Response[T]] = {
+ val base = readResponseNoBody(response)
+
+ val body = if (codeIsSuccess(base.code)) {
+ rm.map(eagerResponseHandler(response).handle(responseAs, rm))(Right(_))
+ } else {
+ rm.map(eagerResponseHandler(response).handle(asString, rm))(Left(_))
+ }
+
+ rm.map(body) { b: Either[String, T] =>
+ base.copy(body = b)
+ }
+ }
+
+ private def readResponseNoBody(response: AsyncResponse): Response[Unit] = {
+ Response(Right(()),
+ response.getStatusCode,
+ response.getHeaders
+ .iterator()
+ .asScala
+ .map(e => (e.getKey, e.getValue))
+ .toList,
+ Nil)
+ }
+
+ private def eagerResponseHandler(response: AsyncResponse) =
+ new EagerResponseHandler[S] {
+ override def handleBasic[T](bra: BasicResponseAs[T, S]): Try[T] =
+ bra match {
+ case IgnoreResponse =>
+ // getting the body and discarding it
+ response.getResponseBodyAsBytes
+ Try(())
+
+ case ResponseAsString(enc) =>
+ Try(response.getResponseBody(Charset.forName(enc)))
+
+ case ResponseAsByteArray =>
+ Try(response.getResponseBodyAsBytes)
+
+ case ResponseAsStream() =>
+ Failure(
+ new IllegalStateException(
+ "Requested a streaming response, trying to read eagerly."))
+
+ case ResponseAsFile(file, overwrite) =>
+ Try(
+ ResponseAs
+ .saveFile(file, response.getResponseBodyAsStream, overwrite))
+ }
+ }
+
+ override def close(): Unit = {
+ if (closeClient)
+ asyncHttpClient.close()
+ }
+}
+
+object AsyncHttpClientBackend {
+
+ private[asynchttpclient] def defaultClient(
+ connectionTimeout: Int): AsyncHttpClient =
+ new DefaultAsyncHttpClient(
+ new DefaultAsyncHttpClientConfig.Builder()
+ .setConnectTimeout(connectionTimeout)
+ .build()
+ )
+}
+
+object EmptyPublisher extends Publisher[ByteBuffer] {
+ override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {
+ s.onComplete()
+ }
+}