diff options
author | adamw <adam@warski.org> | 2017-09-14 11:03:21 +0100 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-09-14 11:03:21 +0100 |
commit | fbc71ee712635ed64c50ca694735a84ec794eb11 (patch) | |
tree | bf1dd7335306b7f320262d45d0d5b6d02f5a0b27 /async-http-client-backend | |
parent | a971d409cb1063a2089d936abf3d3ab70bbbabb6 (diff) | |
download | sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.gz sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.bz2 sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.zip |
Renaming "handler" to "backend"
Diffstat (limited to 'async-http-client-backend')
6 files changed, 775 insertions, 0 deletions
diff --git a/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala new file mode 100644 index 0000000..b5beb75 --- /dev/null +++ b/async-http-client-backend/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsBackend.scala @@ -0,0 +1,82 @@ +package com.softwaremill.sttp.asynchttpclient.cats + +import java.nio.ByteBuffer + +import cats.effect._ +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend +import com.softwaremill.sttp.{ + FollowRedirectsBackend, + MonadAsyncError, + SttpBackend +} +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.concurrent.duration.FiniteDuration +import scala.language.higherKinds + +class AsyncHttpClientCatsBackend[F[_]: Async] private ( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean +) extends AsyncHttpClientBackend[F, Nothing]( + asyncHttpClient, + new AsyncMonad, + closeClient + ) { + override protected def streamBodyToPublisher( + s: Nothing): Publisher[ByteBuffer] = s // nothing is everything + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Nothing = + throw new IllegalStateException("This backend does not support streaming") + + override protected def publisherToString( + p: Publisher[ByteBuffer]): F[String] = + throw new IllegalStateException("This backend does not support streaming") +} + +object AsyncHttpClientCatsBackend { + + private def apply[F[_]: Async]( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean): SttpBackend[F, Nothing] = + new FollowRedirectsBackend[F, Nothing]( + new AsyncHttpClientCatsBackend(asyncHttpClient, closeClient)) + + def apply[F[_]: Async]( + connectionTimeout: FiniteDuration = SttpBackend.DefaultConnectionTimeout) + : SttpBackend[F, Nothing] = + AsyncHttpClientCatsBackend( + AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt), + closeClient = true) + + def usingConfig[F[_]: Async]( + cfg: AsyncHttpClientConfig): SttpBackend[F, Nothing] = + AsyncHttpClientCatsBackend(new DefaultAsyncHttpClient(cfg), + closeClient = true) + + def usingClient[F[_]: Async]( + client: AsyncHttpClient): SttpBackend[F, Nothing] = + AsyncHttpClientCatsBackend(client, closeClient = false) +} + +private[cats] class AsyncMonad[F[_]](implicit F: Async[F]) + extends MonadAsyncError[F] { + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = + F.async(register) + + override def unit[T](t: T): F[T] = F.pure(t) + + override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f) + + override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] = + F.flatMap(fa)(f) + + override def error[T](t: Throwable): F[T] = F.raiseError(t) +} diff --git a/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala new file mode 100644 index 0000000..90db69c --- /dev/null +++ b/async-http-client-backend/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Backend.scala @@ -0,0 +1,112 @@ +package com.softwaremill.sttp.asynchttpclient.fs2 + +import java.nio.ByteBuffer + +import cats.effect._ +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend +import com.softwaremill.sttp.{ + FollowRedirectsBackend, + MonadAsyncError, + SttpBackend, + Utf8, + concatByteBuffers +} +import fs2._ +import fs2.interop.reactivestreams._ +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration +import scala.language.higherKinds + +class AsyncHttpClientFs2Backend[F[_]: Effect] private ( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)(implicit ec: ExecutionContext) + extends AsyncHttpClientBackend[F, Stream[F, ByteBuffer]]( + asyncHttpClient, + new EffectMonad, + closeClient + ) { + + override protected def streamBodyToPublisher( + s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] = + s.toUnicastPublisher + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] = + p.toStream[F] + + override protected def publisherToString( + p: Publisher[ByteBuffer]): F[String] = { + val bytes = p + .toStream[F] + .runFold(ByteBuffer.allocate(0))(concatByteBuffers) + + implicitly[Effect[F]].map(bytes)(bb => new String(bb.array(), Utf8)) + } +} + +object AsyncHttpClientFs2Backend { + + private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)( + implicit ec: ExecutionContext): SttpBackend[F, Stream[F, ByteBuffer]] = + new FollowRedirectsBackend( + new AsyncHttpClientFs2Backend(asyncHttpClient, closeClient)) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def apply[F[_]: Effect](connectionTimeout: FiniteDuration = + SttpBackend.DefaultConnectionTimeout)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F]( + AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F](new DefaultAsyncHttpClient(cfg), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingClient[F[_]: Effect](client: AsyncHttpClient)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[F, Stream[F, ByteBuffer]] = + AsyncHttpClientFs2Backend[F](client, closeClient = false) +} + +private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) + extends MonadAsyncError[F] { + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = + F.async(register) + + override def unit[T](t: T): F[T] = F.pure(t) + + override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f) + + override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] = + F.flatMap(fa)(f) + + override def error[T](t: Throwable): F[T] = F.raiseError(t) +} diff --git a/async-http-client-backend/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureBackend.scala b/async-http-client-backend/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureBackend.scala new file mode 100644 index 0000000..a46ed0d --- /dev/null +++ b/async-http-client-backend/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureBackend.scala @@ -0,0 +1,76 @@ +package com.softwaremill.sttp.asynchttpclient.future + +import java.nio.ByteBuffer + +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend +import com.softwaremill.sttp.{FollowRedirectsBackend, FutureMonad, SttpBackend} +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} + +class AsyncHttpClientFutureBackend private ( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)(implicit ec: ExecutionContext) + extends AsyncHttpClientBackend[Future, Nothing](asyncHttpClient, + new FutureMonad, + closeClient) { + + override protected def streamBodyToPublisher( + s: Nothing): Publisher[ByteBuffer] = s // nothing is everything + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Nothing = + throw new IllegalStateException("This backend does not support streaming") + + override protected def publisherToString( + p: Publisher[ByteBuffer]): Future[String] = + throw new IllegalStateException("This backend does not support streaming") +} + +object AsyncHttpClientFutureBackend { + + private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)( + implicit ec: ExecutionContext): SttpBackend[Future, Nothing] = + new FollowRedirectsBackend[Future, Nothing]( + new AsyncHttpClientFutureBackend(asyncHttpClient, closeClient)) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def apply(connectionTimeout: FiniteDuration = + SttpBackend.DefaultConnectionTimeout)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[Future, Nothing] = + AsyncHttpClientFutureBackend( + AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingConfig(cfg: AsyncHttpClientConfig)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpBackend[Future, Nothing] = + AsyncHttpClientFutureBackend(new DefaultAsyncHttpClient(cfg), + closeClient = true) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext = + ExecutionContext.Implicits.global) + : SttpBackend[Future, Nothing] = + AsyncHttpClientFutureBackend(client, closeClient = false) +} diff --git a/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala new file mode 100644 index 0000000..c08c244 --- /dev/null +++ b/async-http-client-backend/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixBackend.scala @@ -0,0 +1,113 @@ +package com.softwaremill.sttp.asynchttpclient.monix + +import java.nio.ByteBuffer + +import com.softwaremill.sttp.{ + FollowRedirectsBackend, + MonadAsyncError, + SttpBackend, + Utf8, + concatByteBuffers +} +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend +import monix.eval.Task +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.Observable +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.concurrent.duration.FiniteDuration +import scala.util.{Failure, Success} + +class AsyncHttpClientMonixBackend private ( + asyncHttpClient: AsyncHttpClient, + closeClient: Boolean)(implicit scheduler: Scheduler) + extends AsyncHttpClientBackend[Task, Observable[ByteBuffer]]( + asyncHttpClient, + TaskMonad, + closeClient) { + + override protected def streamBodyToPublisher( + s: Observable[ByteBuffer]): Publisher[ByteBuffer] = + s.toReactivePublisher + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Observable[ByteBuffer] = + Observable.fromReactivePublisher(p) + + override protected def publisherToString( + p: Publisher[ByteBuffer]): Task[String] = { + + val bytes = Observable + .fromReactivePublisher(p) + .foldLeftL(ByteBuffer.allocate(0))(concatByteBuffers) + + bytes.map(bb => new String(bb.array(), Utf8)) + } +} + +object AsyncHttpClientMonixBackend { + + private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)( + implicit scheduler: Scheduler) + : SttpBackend[Task, Observable[ByteBuffer]] = + new FollowRedirectsBackend( + new AsyncHttpClientMonixBackend(asyncHttpClient, closeClient)) + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def apply(connectionTimeout: FiniteDuration = + SttpBackend.DefaultConnectionTimeout)( + implicit s: Scheduler = Scheduler.Implicits.global) + : SttpBackend[Task, Observable[ByteBuffer]] = + AsyncHttpClientMonixBackend( + AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt), + closeClient = true) + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler = + Scheduler.Implicits.global) + : SttpBackend[Task, Observable[ByteBuffer]] = + AsyncHttpClientMonixBackend(new DefaultAsyncHttpClient(cfg), + closeClient = true) + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingClient(client: AsyncHttpClient)(implicit s: Scheduler = + Scheduler.Implicits.global) + : SttpBackend[Task, Observable[ByteBuffer]] = + AsyncHttpClientMonixBackend(client, closeClient = false) +} + +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) +} diff --git a/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala new file mode 100644 index 0000000..12e217b --- /dev/null +++ b/async-http-client-backend/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazBackend.scala @@ -0,0 +1,79 @@ +package com.softwaremill.sttp.asynchttpclient.scalaz + +import java.nio.ByteBuffer + +import com.softwaremill.sttp.{ + FollowRedirectsBackend, + MonadAsyncError, + SttpBackend +} +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientBackend +import org.asynchttpclient.{ + AsyncHttpClient, + AsyncHttpClientConfig, + DefaultAsyncHttpClient +} +import org.reactivestreams.Publisher + +import scala.concurrent.duration.FiniteDuration +import scalaz.{-\/, \/-} +import scalaz.concurrent.Task + +class AsyncHttpClientScalazBackend private (asyncHttpClient: AsyncHttpClient, + closeClient: Boolean) + extends AsyncHttpClientBackend[Task, Nothing](asyncHttpClient, + TaskMonad, + closeClient) { + + override protected def streamBodyToPublisher( + s: Nothing): Publisher[ByteBuffer] = s // nothing is everything + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Nothing = + throw new IllegalStateException("This backend does not support streaming") + + override protected def publisherToString( + p: Publisher[ByteBuffer]): Task[String] = + throw new IllegalStateException("This backend does not support streaming") +} + +object AsyncHttpClientScalazBackend { + private def apply(asyncHttpClient: AsyncHttpClient, + closeClient: Boolean): SttpBackend[Task, Nothing] = + new FollowRedirectsBackend[Task, Nothing]( + new AsyncHttpClientScalazBackend(asyncHttpClient, closeClient)) + + def apply( + connectionTimeout: FiniteDuration = SttpBackend.DefaultConnectionTimeout) + : SttpBackend[Task, Nothing] = + AsyncHttpClientScalazBackend( + AsyncHttpClientBackend.defaultClient(connectionTimeout.toMillis.toInt), + closeClient = true) + + def usingConfig(cfg: AsyncHttpClientConfig): SttpBackend[Task, Nothing] = + AsyncHttpClientScalazBackend(new DefaultAsyncHttpClient(cfg), + closeClient = true) + + def usingClient(client: AsyncHttpClient): SttpBackend[Task, Nothing] = + AsyncHttpClientScalazBackend(client, closeClient = false) +} + +private[scalaz] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.point(t) + + override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { cb => + register { + case Left(t) => cb(-\/(t)) + case Right(t) => cb(\/-(t)) + } + } + + override def error[T](t: Throwable): Task[T] = Task.fail(t) +} diff --git a/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala new file mode 100644 index 0000000..87a3965 --- /dev/null +++ b/async-http-client-backend/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientBackend.scala @@ -0,0 +1,313 @@ +package com.softwaremill.sttp.asynchttpclient + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.charset.Charset + +import com.softwaremill.sttp.ResponseAs.EagerResponseHandler +import com.softwaremill.sttp._ +import org.asynchttpclient.AsyncHandler.State +import org.asynchttpclient.handler.StreamedAsyncHandler +import org.asynchttpclient.request.body.multipart.{ + ByteArrayPart, + FilePart, + StringPart +} +import org.asynchttpclient.{ + AsyncCompletionHandler, + AsyncHandler, + AsyncHttpClient, + DefaultAsyncHttpClientConfig, + DefaultAsyncHttpClient, + HttpResponseBodyPart, + HttpResponseHeaders, + HttpResponseStatus, + Param, + RequestBuilder, + Request => AsyncRequest, + Response => AsyncResponse +} +import org.reactivestreams.{Publisher, Subscriber, Subscription} + +import scala.collection.JavaConverters._ +import scala.language.higherKinds +import scala.util.{Failure, Try} + +abstract class AsyncHttpClientBackend[R[_], S](asyncHttpClient: AsyncHttpClient, + rm: MonadAsyncError[R], + closeClient: Boolean) + extends SttpBackend[R, S] { + + override def send[T](r: Request[T, S]): R[Response[T]] = { + val preparedRequest = asyncHttpClient + .prepareRequest(requestToAsync(r)) + + rm.flatten(rm.async[R[Response[T]]] { cb => + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) + + r.response match { + case ras @ ResponseAsStream() => + preparedRequest + .execute(streamingAsyncHandler(ras, success, error)) + + case ra => + preparedRequest + .execute(eagerAsyncHandler(ra, success, error)) + } + }) + } + + override def responseMonad: MonadError[R] = rm + + protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer] + + protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S + + protected def publisherToString(p: Publisher[ByteBuffer]): R[String] + + private def eagerAsyncHandler[T]( + responseAs: ResponseAs[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { + + new AsyncCompletionHandler[Unit] { + override def onCompleted(response: AsyncResponse): Unit = + success(readEagerResponse(response, responseAs)) + + override def onThrowable(t: Throwable): Unit = error(t) + } + } + + private def streamingAsyncHandler[T]( + responseAs: ResponseAsStream[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { + new StreamedAsyncHandler[Unit] { + private val builder = new AsyncResponse.ResponseBuilder() + private var publisher: Option[Publisher[ByteBuffer]] = None + private var completed = false + + override def onStream( + p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { + // Sadly we don't have .map on Publisher + publisher = Some(new Publisher[ByteBuffer] { + override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = + p.subscribe(new Subscriber[HttpResponseBodyPart] { + override def onError(t: Throwable): Unit = s.onError(t) + override def onComplete(): Unit = s.onComplete() + override def onNext(t: HttpResponseBodyPart): Unit = + s.onNext(t.getBodyByteBuffer) + override def onSubscribe(v: Subscription): Unit = + s.onSubscribe(v) + }) + }) + // #2: sometimes onCompleted() isn't called, only onStream(); this + // seems to be true esp for https sites. For these cases, completing + // the request here. + doComplete() + State.CONTINUE + } + + override def onBodyPartReceived( + bodyPart: HttpResponseBodyPart): AsyncHandler.State = + throw new IllegalStateException( + "Requested a streaming backend, unexpected eager body parts.") + + override def onHeadersReceived( + headers: HttpResponseHeaders): AsyncHandler.State = { + builder.accumulate(headers) + State.CONTINUE + } + + override def onStatusReceived( + responseStatus: HttpResponseStatus): AsyncHandler.State = { + builder.accumulate(responseStatus) + State.CONTINUE + } + + override def onCompleted(): Unit = { + // if the request had no body, onStream() will never be called + doComplete() + } + + private def doComplete(): Unit = { + if (!completed) { + completed = true + + val baseResponse = readResponseNoBody(builder.build()) + val p = publisher.getOrElse(EmptyPublisher) + val s = publisherToStreamBody(p) + val b = if (codeIsSuccess(baseResponse.code)) { + rm.unit(Right(responseAs.responseIsStream(s))) + } else { + rm.map(publisherToString(p))(Left(_)) + } + + success(rm.map(b) { bb: Either[String, T] => + baseResponse.copy(body = bb) + }) + } + } + + override def onThrowable(t: Throwable): Unit = { + error(t) + } + } + } + + private def requestToAsync(r: Request[_, S]): AsyncRequest = { + val readTimeout = r.options.readTimeout + val rb = new RequestBuilder(r.method.m) + .setUrl(r.uri.toString) + .setRequestTimeout( + if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1) + r.headers.foreach { case (k, v) => rb.setHeader(k, v) } + setBody(r, r.body, rb) + rb.build() + } + + private def setBody(r: Request[_, S], + body: RequestBody[S], + rb: RequestBuilder): Unit = { + body match { + case NoBody => // skip + + case StringBody(b, encoding, _) => + rb.setBody(b.getBytes(encoding)) + + case ByteArrayBody(b, _) => + rb.setBody(b) + + case ByteBufferBody(b, _) => + rb.setBody(b) + + case InputStreamBody(b, _) => + rb.setBody(b) + + case PathBody(b, _) => + rb.setBody(b.toFile) + + case StreamBody(s) => + val cl = r.headers + .find(_._1.equalsIgnoreCase(ContentLengthHeader)) + .map(_._2.toLong) + .getOrElse(-1L) + rb.setBody(streamBodyToPublisher(s), cl) + + case MultipartBody(ps) => + ps.foreach(addMultipartBody(rb, _)) + } + } + + private def addMultipartBody(rb: RequestBuilder, mp: Multipart): Unit = { + // async http client only supports setting file names on file parts. To + // set a file name on an arbitrary part we have to use a small "work + // around", combining the file name with the name (surrounding quotes + // are added by ahc). + def nameWithFilename = mp.fileName.fold(mp.name) { fn => + s"""${mp.name}"; filename="$fn""" + } + + val bodyPart = mp.body match { + case StringBody(b, encoding, _) => + new StringPart(nameWithFilename, + b, + mp.contentType.getOrElse(TextPlainContentType), + Charset.forName(encoding)) + case ByteArrayBody(b, _) => + new ByteArrayPart(nameWithFilename, b) + case ByteBufferBody(b, _) => + new ByteArrayPart(nameWithFilename, b.array()) + case InputStreamBody(b, _) => + // sadly async http client only supports parts that are strings, + // byte arrays or files + val baos = new ByteArrayOutputStream() + transfer(b, baos) + new ByteArrayPart(nameWithFilename, baos.toByteArray) + case PathBody(b, _) => + new FilePart(mp.name, b.toFile, null, null, mp.fileName.orNull) + } + + bodyPart.setCustomHeaders( + mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava) + + rb.addBodyPart(bodyPart) + } + + private def readEagerResponse[T]( + response: AsyncResponse, + responseAs: ResponseAs[T, S]): R[Response[T]] = { + val base = readResponseNoBody(response) + + val body = if (codeIsSuccess(base.code)) { + rm.map(eagerResponseHandler(response).handle(responseAs, rm))(Right(_)) + } else { + rm.map(eagerResponseHandler(response).handle(asString, rm))(Left(_)) + } + + rm.map(body) { b: Either[String, T] => + base.copy(body = b) + } + } + + private def readResponseNoBody(response: AsyncResponse): Response[Unit] = { + Response(Right(()), + response.getStatusCode, + response.getHeaders + .iterator() + .asScala + .map(e => (e.getKey, e.getValue)) + .toList, + Nil) + } + + private def eagerResponseHandler(response: AsyncResponse) = + new EagerResponseHandler[S] { + override def handleBasic[T](bra: BasicResponseAs[T, S]): Try[T] = + bra match { + case IgnoreResponse => + // getting the body and discarding it + response.getResponseBodyAsBytes + Try(()) + + case ResponseAsString(enc) => + Try(response.getResponseBody(Charset.forName(enc))) + + case ResponseAsByteArray => + Try(response.getResponseBodyAsBytes) + + case ResponseAsStream() => + Failure( + new IllegalStateException( + "Requested a streaming response, trying to read eagerly.")) + + case ResponseAsFile(file, overwrite) => + Try( + ResponseAs + .saveFile(file, response.getResponseBodyAsStream, overwrite)) + } + } + + override def close(): Unit = { + if (closeClient) + asyncHttpClient.close() + } +} + +object AsyncHttpClientBackend { + + private[asynchttpclient] def defaultClient( + connectionTimeout: Int): AsyncHttpClient = + new DefaultAsyncHttpClient( + new DefaultAsyncHttpClientConfig.Builder() + .setConnectTimeout(connectionTimeout) + .build() + ) +} + +object EmptyPublisher extends Publisher[ByteBuffer] { + override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = { + s.onComplete() + } +} |