From fbc71ee712635ed64c50ca694735a84ec794eb11 Mon Sep 17 00:00:00 2001 From: adamw Date: Thu, 14 Sep 2017 11:03:21 +0100 Subject: Renaming "handler" to "backend" --- .../cats/AsyncHttpClientCatsHandler.scala | 82 ------ .../fs2/AsyncHttpClientFs2Handler.scala | 112 -------- .../future/AsyncHttpClientFutureHandler.scala | 76 ----- .../monix/AsyncHttpClientMonixHandler.scala | 113 -------- .../scalaz/AsyncHttpClientScalazHandler.scala | 79 ------ .../asynchttpclient/AsyncHttpClientHandler.scala | 313 --------------------- 6 files changed, 775 deletions(-) delete mode 100644 async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala delete mode 100644 async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala delete mode 100644 async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala delete mode 100644 async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala delete mode 100644 async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala delete mode 100644 async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala (limited to 'async-http-client-handler') diff --git a/async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala b/async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala deleted file mode 100644 index fd4d88d..0000000 --- a/async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala +++ /dev/null @@ -1,82 +0,0 @@ -package com.softwaremill.sttp.asynchttpclient.cats - -import java.nio.ByteBuffer - -import cats.effect._ -import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler -import com.softwaremill.sttp.{ - FollowRedirectsHandler, - MonadAsyncError, - SttpHandler -} -import org.asynchttpclient.{ - AsyncHttpClient, - AsyncHttpClientConfig, - DefaultAsyncHttpClient -} -import org.reactivestreams.Publisher - -import scala.concurrent.duration.FiniteDuration -import scala.language.higherKinds - -class AsyncHttpClientCatsHandler[F[_]: Async] private ( - asyncHttpClient: AsyncHttpClient, - closeClient: Boolean -) extends AsyncHttpClientHandler[F, Nothing]( - asyncHttpClient, - new AsyncMonad, - closeClient - ) { - override protected def streamBodyToPublisher( - s: Nothing): Publisher[ByteBuffer] = s // nothing is everything - - override protected def publisherToStreamBody( - p: Publisher[ByteBuffer]): Nothing = - throw new IllegalStateException("This handler does not support streaming") - - override protected def publisherToString( - p: Publisher[ByteBuffer]): F[String] = - throw new IllegalStateException("This handler does not support streaming") -} - -object AsyncHttpClientCatsHandler { - - private def apply[F[_]: Async]( - asyncHttpClient: AsyncHttpClient, - closeClient: Boolean): SttpHandler[F, Nothing] = - new FollowRedirectsHandler[F, Nothing]( - new AsyncHttpClientCatsHandler(asyncHttpClient, closeClient)) - - def apply[F[_]: Async]( - connectionTimeout: FiniteDuration = SttpHandler.DefaultConnectionTimeout) - : SttpHandler[F, Nothing] = - AsyncHttpClientCatsHandler( - AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt), - closeClient = true) - - def usingConfig[F[_]: Async]( - cfg: AsyncHttpClientConfig): SttpHandler[F, Nothing] = - AsyncHttpClientCatsHandler(new DefaultAsyncHttpClient(cfg), - closeClient = true) - - def usingClient[F[_]: Async]( - client: AsyncHttpClient): SttpHandler[F, Nothing] = - AsyncHttpClientCatsHandler(client, closeClient = false) -} - -private[cats] class AsyncMonad[F[_]](implicit F: Async[F]) - extends MonadAsyncError[F] { - - override def async[T]( - register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = - F.async(register) - - override def unit[T](t: T): F[T] = F.pure(t) - - override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f) - - override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] = - F.flatMap(fa)(f) - - override def error[T](t: Throwable): F[T] = F.raiseError(t) -} diff --git a/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala b/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala deleted file mode 100644 index 1462951..0000000 --- a/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala +++ /dev/null @@ -1,112 +0,0 @@ -package com.softwaremill.sttp.asynchttpclient.fs2 - -import java.nio.ByteBuffer - -import cats.effect._ -import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler -import com.softwaremill.sttp.{ - FollowRedirectsHandler, - MonadAsyncError, - SttpHandler, - Utf8, - concatByteBuffers -} -import fs2._ -import fs2.interop.reactivestreams._ -import org.asynchttpclient.{ - AsyncHttpClient, - AsyncHttpClientConfig, - DefaultAsyncHttpClient -} -import org.reactivestreams.Publisher - -import scala.concurrent.ExecutionContext -import scala.concurrent.duration.FiniteDuration -import scala.language.higherKinds - -class AsyncHttpClientFs2Handler[F[_]: Effect] private ( - asyncHttpClient: AsyncHttpClient, - closeClient: Boolean)(implicit ec: ExecutionContext) - extends AsyncHttpClientHandler[F, Stream[F, ByteBuffer]]( - asyncHttpClient, - new EffectMonad, - closeClient - ) { - - override protected def streamBodyToPublisher( - s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] = - s.toUnicastPublisher - - override protected def publisherToStreamBody( - p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] = - p.toStream[F] - - override protected def publisherToString( - p: Publisher[ByteBuffer]): F[String] = { - val bytes = p - .toStream[F] - .runFold(ByteBuffer.allocate(0))(concatByteBuffers) - - implicitly[Effect[F]].map(bytes)(bb => new String(bb.array(), Utf8)) - } -} - -object AsyncHttpClientFs2Handler { - - private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient, - closeClient: Boolean)( - implicit ec: ExecutionContext): SttpHandler[F, Stream[F, ByteBuffer]] = - new FollowRedirectsHandler( - new AsyncHttpClientFs2Handler(asyncHttpClient, closeClient)) - - /** - * @param ec The execution context for running non-network related operations, - * e.g. mapping responses. Defaults to the global execution - * context. - */ - def apply[F[_]: Effect](connectionTimeout: FiniteDuration = - SttpHandler.DefaultConnectionTimeout)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[F, Stream[F, ByteBuffer]] = - AsyncHttpClientFs2Handler[F]( - AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt), - closeClient = true) - - /** - * @param ec The execution context for running non-network related operations, - * e.g. mapping responses. Defaults to the global execution - * context. - */ - def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[F, Stream[F, ByteBuffer]] = - AsyncHttpClientFs2Handler[F](new DefaultAsyncHttpClient(cfg), - closeClient = true) - - /** - * @param ec The execution context for running non-network related operations, - * e.g. mapping responses. Defaults to the global execution - * context. - */ - def usingClient[F[_]: Effect](client: AsyncHttpClient)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[F, Stream[F, ByteBuffer]] = - AsyncHttpClientFs2Handler[F](client, closeClient = false) -} - -private[fs2] class EffectMonad[F[_]](implicit F: Effect[F]) - extends MonadAsyncError[F] { - - override def async[T]( - register: ((Either[Throwable, T]) => Unit) => Unit): F[T] = - F.async(register) - - override def unit[T](t: T): F[T] = F.pure(t) - - override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f) - - override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] = - F.flatMap(fa)(f) - - override def error[T](t: Throwable): F[T] = F.raiseError(t) -} diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala deleted file mode 100644 index 371b23c..0000000 --- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala +++ /dev/null @@ -1,76 +0,0 @@ -package com.softwaremill.sttp.asynchttpclient.future - -import java.nio.ByteBuffer - -import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler -import com.softwaremill.sttp.{FollowRedirectsHandler, FutureMonad, SttpHandler} -import org.asynchttpclient.{ - AsyncHttpClient, - AsyncHttpClientConfig, - DefaultAsyncHttpClient -} -import org.reactivestreams.Publisher - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} - -class AsyncHttpClientFutureHandler private ( - asyncHttpClient: AsyncHttpClient, - closeClient: Boolean)(implicit ec: ExecutionContext) - extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient, - new FutureMonad, - closeClient) { - - override protected def streamBodyToPublisher( - s: Nothing): Publisher[ByteBuffer] = s // nothing is everything - - override protected def publisherToStreamBody( - p: Publisher[ByteBuffer]): Nothing = - throw new IllegalStateException("This handler does not support streaming") - - override protected def publisherToString( - p: Publisher[ByteBuffer]): Future[String] = - throw new IllegalStateException("This handler does not support streaming") -} - -object AsyncHttpClientFutureHandler { - - private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)( - implicit ec: ExecutionContext): SttpHandler[Future, Nothing] = - new FollowRedirectsHandler[Future, Nothing]( - new AsyncHttpClientFutureHandler(asyncHttpClient, closeClient)) - - /** - * @param ec The execution context for running non-network related operations, - * e.g. mapping responses. Defaults to the global execution - * context. - */ - def apply(connectionTimeout: FiniteDuration = - SttpHandler.DefaultConnectionTimeout)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[Future, Nothing] = - AsyncHttpClientFutureHandler( - AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt), - closeClient = true) - - /** - * @param ec The execution context for running non-network related operations, - * e.g. mapping responses. Defaults to the global execution - * context. - */ - def usingConfig(cfg: AsyncHttpClientConfig)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[Future, Nothing] = - AsyncHttpClientFutureHandler(new DefaultAsyncHttpClient(cfg), - closeClient = true) - - /** - * @param ec The execution context for running non-network related operations, - * e.g. mapping responses. Defaults to the global execution - * context. - */ - def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext = - ExecutionContext.Implicits.global) - : SttpHandler[Future, Nothing] = - AsyncHttpClientFutureHandler(client, closeClient = false) -} diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala deleted file mode 100644 index f01725f..0000000 --- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala +++ /dev/null @@ -1,113 +0,0 @@ -package com.softwaremill.sttp.asynchttpclient.monix - -import java.nio.ByteBuffer - -import com.softwaremill.sttp.{ - FollowRedirectsHandler, - MonadAsyncError, - SttpHandler, - Utf8, - concatByteBuffers -} -import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler -import monix.eval.Task -import monix.execution.{Cancelable, Scheduler} -import monix.reactive.Observable -import org.asynchttpclient.{ - AsyncHttpClient, - AsyncHttpClientConfig, - DefaultAsyncHttpClient -} -import org.reactivestreams.Publisher - -import scala.concurrent.duration.FiniteDuration -import scala.util.{Failure, Success} - -class AsyncHttpClientMonixHandler private ( - asyncHttpClient: AsyncHttpClient, - closeClient: Boolean)(implicit scheduler: Scheduler) - extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]]( - asyncHttpClient, - TaskMonad, - closeClient) { - - override protected def streamBodyToPublisher( - s: Observable[ByteBuffer]): Publisher[ByteBuffer] = - s.toReactivePublisher - - override protected def publisherToStreamBody( - p: Publisher[ByteBuffer]): Observable[ByteBuffer] = - Observable.fromReactivePublisher(p) - - override protected def publisherToString( - p: Publisher[ByteBuffer]): Task[String] = { - - val bytes = Observable - .fromReactivePublisher(p) - .foldLeftL(ByteBuffer.allocate(0))(concatByteBuffers) - - bytes.map(bb => new String(bb.array(), Utf8)) - } -} - -object AsyncHttpClientMonixHandler { - - private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)( - implicit scheduler: Scheduler) - : SttpHandler[Task, Observable[ByteBuffer]] = - new FollowRedirectsHandler( - new AsyncHttpClientMonixHandler(asyncHttpClient, closeClient)) - - /** - * @param s The scheduler used for streaming request bodies. Defaults to the - * global scheduler. - */ - def apply(connectionTimeout: FiniteDuration = - SttpHandler.DefaultConnectionTimeout)( - implicit s: Scheduler = Scheduler.Implicits.global) - : SttpHandler[Task, Observable[ByteBuffer]] = - AsyncHttpClientMonixHandler( - AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt), - closeClient = true) - - /** - * @param s The scheduler used for streaming request bodies. Defaults to the - * global scheduler. - */ - def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler = - Scheduler.Implicits.global) - : SttpHandler[Task, Observable[ByteBuffer]] = - AsyncHttpClientMonixHandler(new DefaultAsyncHttpClient(cfg), - closeClient = true) - - /** - * @param s The scheduler used for streaming request bodies. Defaults to the - * global scheduler. - */ - def usingClient(client: AsyncHttpClient)(implicit s: Scheduler = - Scheduler.Implicits.global) - : SttpHandler[Task, Observable[ByteBuffer]] = - AsyncHttpClientMonixHandler(client, closeClient = false) -} - -private[monix] object TaskMonad extends MonadAsyncError[Task] { - override def unit[T](t: T): Task[T] = Task.now(t) - - override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = - fa.flatMap(f) - - override def async[T]( - register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = - Task.async { (_, cb) => - register { - case Left(t) => cb(Failure(t)) - case Right(t) => cb(Success(t)) - } - - Cancelable.empty - } - - override def error[T](t: Throwable): Task[T] = Task.raiseError(t) -} diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala deleted file mode 100644 index 2ba00d7..0000000 --- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala +++ /dev/null @@ -1,79 +0,0 @@ -package com.softwaremill.sttp.asynchttpclient.scalaz - -import java.nio.ByteBuffer - -import com.softwaremill.sttp.{ - FollowRedirectsHandler, - MonadAsyncError, - SttpHandler -} -import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler -import org.asynchttpclient.{ - AsyncHttpClient, - AsyncHttpClientConfig, - DefaultAsyncHttpClient -} -import org.reactivestreams.Publisher - -import scala.concurrent.duration.FiniteDuration -import scalaz.{-\/, \/-} -import scalaz.concurrent.Task - -class AsyncHttpClientScalazHandler private (asyncHttpClient: AsyncHttpClient, - closeClient: Boolean) - extends AsyncHttpClientHandler[Task, Nothing](asyncHttpClient, - TaskMonad, - closeClient) { - - override protected def streamBodyToPublisher( - s: Nothing): Publisher[ByteBuffer] = s // nothing is everything - - override protected def publisherToStreamBody( - p: Publisher[ByteBuffer]): Nothing = - throw new IllegalStateException("This handler does not support streaming") - - override protected def publisherToString( - p: Publisher[ByteBuffer]): Task[String] = - throw new IllegalStateException("This handler does not support streaming") -} - -object AsyncHttpClientScalazHandler { - private def apply(asyncHttpClient: AsyncHttpClient, - closeClient: Boolean): SttpHandler[Task, Nothing] = - new FollowRedirectsHandler[Task, Nothing]( - new AsyncHttpClientScalazHandler(asyncHttpClient, closeClient)) - - def apply( - connectionTimeout: FiniteDuration = SttpHandler.DefaultConnectionTimeout) - : SttpHandler[Task, Nothing] = - AsyncHttpClientScalazHandler( - AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt), - closeClient = true) - - def usingConfig(cfg: AsyncHttpClientConfig): SttpHandler[Task, Nothing] = - AsyncHttpClientScalazHandler(new DefaultAsyncHttpClient(cfg), - closeClient = true) - - def usingClient(client: AsyncHttpClient): SttpHandler[Task, Nothing] = - AsyncHttpClientScalazHandler(client, closeClient = false) -} - -private[scalaz] object TaskMonad extends MonadAsyncError[Task] { - override def unit[T](t: T): Task[T] = Task.point(t) - - override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = - fa.flatMap(f) - - override def async[T]( - register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = - Task.async { cb => - register { - case Left(t) => cb(-\/(t)) - case Right(t) => cb(\/-(t)) - } - } - - override def error[T](t: Throwable): Task[T] = Task.fail(t) -} diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala deleted file mode 100644 index 8b6d49c..0000000 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ /dev/null @@ -1,313 +0,0 @@ -package com.softwaremill.sttp.asynchttpclient - -import java.io.ByteArrayOutputStream -import java.nio.ByteBuffer -import java.nio.charset.Charset - -import com.softwaremill.sttp.ResponseAs.EagerResponseHandler -import com.softwaremill.sttp._ -import org.asynchttpclient.AsyncHandler.State -import org.asynchttpclient.handler.StreamedAsyncHandler -import org.asynchttpclient.request.body.multipart.{ - ByteArrayPart, - FilePart, - StringPart -} -import org.asynchttpclient.{ - AsyncCompletionHandler, - AsyncHandler, - AsyncHttpClient, - DefaultAsyncHttpClientConfig, - DefaultAsyncHttpClient, - HttpResponseBodyPart, - HttpResponseHeaders, - HttpResponseStatus, - Param, - RequestBuilder, - Request => AsyncRequest, - Response => AsyncResponse -} -import org.reactivestreams.{Publisher, Subscriber, Subscription} - -import scala.collection.JavaConverters._ -import scala.language.higherKinds -import scala.util.{Failure, Try} - -abstract class AsyncHttpClientHandler[R[_], S](asyncHttpClient: AsyncHttpClient, - rm: MonadAsyncError[R], - closeClient: Boolean) - extends SttpHandler[R, S] { - - override def send[T](r: Request[T, S]): R[Response[T]] = { - val preparedRequest = asyncHttpClient - .prepareRequest(requestToAsync(r)) - - rm.flatten(rm.async[R[Response[T]]] { cb => - def success(r: R[Response[T]]) = cb(Right(r)) - def error(t: Throwable) = cb(Left(t)) - - r.response match { - case ras @ ResponseAsStream() => - preparedRequest - .execute(streamingAsyncHandler(ras, success, error)) - - case ra => - preparedRequest - .execute(eagerAsyncHandler(ra, success, error)) - } - }) - } - - override def responseMonad: MonadError[R] = rm - - protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer] - - protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S - - protected def publisherToString(p: Publisher[ByteBuffer]): R[String] - - private def eagerAsyncHandler[T]( - responseAs: ResponseAs[T, S], - success: R[Response[T]] => Unit, - error: Throwable => Unit): AsyncHandler[Unit] = { - - new AsyncCompletionHandler[Unit] { - override def onCompleted(response: AsyncResponse): Unit = - success(readEagerResponse(response, responseAs)) - - override def onThrowable(t: Throwable): Unit = error(t) - } - } - - private def streamingAsyncHandler[T]( - responseAs: ResponseAsStream[T, S], - success: R[Response[T]] => Unit, - error: Throwable => Unit): AsyncHandler[Unit] = { - new StreamedAsyncHandler[Unit] { - private val builder = new AsyncResponse.ResponseBuilder() - private var publisher: Option[Publisher[ByteBuffer]] = None - private var completed = false - - override def onStream( - p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { - // Sadly we don't have .map on Publisher - publisher = Some(new Publisher[ByteBuffer] { - override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = - p.subscribe(new Subscriber[HttpResponseBodyPart] { - override def onError(t: Throwable): Unit = s.onError(t) - override def onComplete(): Unit = s.onComplete() - override def onNext(t: HttpResponseBodyPart): Unit = - s.onNext(t.getBodyByteBuffer) - override def onSubscribe(v: Subscription): Unit = - s.onSubscribe(v) - }) - }) - // #2: sometimes onCompleted() isn't called, only onStream(); this - // seems to be true esp for https sites. For these cases, completing - // the request here. - doComplete() - State.CONTINUE - } - - override def onBodyPartReceived( - bodyPart: HttpResponseBodyPart): AsyncHandler.State = - throw new IllegalStateException( - "Requested a streaming handler, unexpected eager body parts.") - - override def onHeadersReceived( - headers: HttpResponseHeaders): AsyncHandler.State = { - builder.accumulate(headers) - State.CONTINUE - } - - override def onStatusReceived( - responseStatus: HttpResponseStatus): AsyncHandler.State = { - builder.accumulate(responseStatus) - State.CONTINUE - } - - override def onCompleted(): Unit = { - // if the request had no body, onStream() will never be called - doComplete() - } - - private def doComplete(): Unit = { - if (!completed) { - completed = true - - val baseResponse = readResponseNoBody(builder.build()) - val p = publisher.getOrElse(EmptyPublisher) - val s = publisherToStreamBody(p) - val b = if (codeIsSuccess(baseResponse.code)) { - rm.unit(Right(responseAs.responseIsStream(s))) - } else { - rm.map(publisherToString(p))(Left(_)) - } - - success(rm.map(b) { bb: Either[String, T] => - baseResponse.copy(body = bb) - }) - } - } - - override def onThrowable(t: Throwable): Unit = { - error(t) - } - } - } - - private def requestToAsync(r: Request[_, S]): AsyncRequest = { - val readTimeout = r.options.readTimeout - val rb = new RequestBuilder(r.method.m) - .setUrl(r.uri.toString) - .setRequestTimeout( - if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1) - r.headers.foreach { case (k, v) => rb.setHeader(k, v) } - setBody(r, r.body, rb) - rb.build() - } - - private def setBody(r: Request[_, S], - body: RequestBody[S], - rb: RequestBuilder): Unit = { - body match { - case NoBody => // skip - - case StringBody(b, encoding, _) => - rb.setBody(b.getBytes(encoding)) - - case ByteArrayBody(b, _) => - rb.setBody(b) - - case ByteBufferBody(b, _) => - rb.setBody(b) - - case InputStreamBody(b, _) => - rb.setBody(b) - - case PathBody(b, _) => - rb.setBody(b.toFile) - - case StreamBody(s) => - val cl = r.headers - .find(_._1.equalsIgnoreCase(ContentLengthHeader)) - .map(_._2.toLong) - .getOrElse(-1L) - rb.setBody(streamBodyToPublisher(s), cl) - - case MultipartBody(ps) => - ps.foreach(addMultipartBody(rb, _)) - } - } - - private def addMultipartBody(rb: RequestBuilder, mp: Multipart): Unit = { - // async http client only supports setting file names on file parts. To - // set a file name on an arbitrary part we have to use a small "work - // around", combining the file name with the name (surrounding quotes - // are added by ahc). - def nameWithFilename = mp.fileName.fold(mp.name) { fn => - s"""${mp.name}"; filename="$fn""" - } - - val bodyPart = mp.body match { - case StringBody(b, encoding, _) => - new StringPart(nameWithFilename, - b, - mp.contentType.getOrElse(TextPlainContentType), - Charset.forName(encoding)) - case ByteArrayBody(b, _) => - new ByteArrayPart(nameWithFilename, b) - case ByteBufferBody(b, _) => - new ByteArrayPart(nameWithFilename, b.array()) - case InputStreamBody(b, _) => - // sadly async http client only supports parts that are strings, - // byte arrays or files - val baos = new ByteArrayOutputStream() - transfer(b, baos) - new ByteArrayPart(nameWithFilename, baos.toByteArray) - case PathBody(b, _) => - new FilePart(mp.name, b.toFile, null, null, mp.fileName.orNull) - } - - bodyPart.setCustomHeaders( - mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava) - - rb.addBodyPart(bodyPart) - } - - private def readEagerResponse[T]( - response: AsyncResponse, - responseAs: ResponseAs[T, S]): R[Response[T]] = { - val base = readResponseNoBody(response) - - val body = if (codeIsSuccess(base.code)) { - rm.map(eagerResponseHandler(response).handle(responseAs, rm))(Right(_)) - } else { - rm.map(eagerResponseHandler(response).handle(asString, rm))(Left(_)) - } - - rm.map(body) { b: Either[String, T] => - base.copy(body = b) - } - } - - private def readResponseNoBody(response: AsyncResponse): Response[Unit] = { - Response(Right(()), - response.getStatusCode, - response.getHeaders - .iterator() - .asScala - .map(e => (e.getKey, e.getValue)) - .toList, - Nil) - } - - private def eagerResponseHandler(response: AsyncResponse) = - new EagerResponseHandler[S] { - override def handleBasic[T](bra: BasicResponseAs[T, S]): Try[T] = - bra match { - case IgnoreResponse => - // getting the body and discarding it - response.getResponseBodyAsBytes - Try(()) - - case ResponseAsString(enc) => - Try(response.getResponseBody(Charset.forName(enc))) - - case ResponseAsByteArray => - Try(response.getResponseBodyAsBytes) - - case ResponseAsStream() => - Failure( - new IllegalStateException( - "Requested a streaming response, trying to read eagerly.")) - - case ResponseAsFile(file, overwrite) => - Try( - ResponseAs - .saveFile(file, response.getResponseBodyAsStream, overwrite)) - } - } - - override def close(): Unit = { - if (closeClient) - asyncHttpClient.close() - } -} - -object AsyncHttpClientHandler { - - private[asynchttpclient] def defaultClient( - connectionTimeout: Int): AsyncHttpClient = - new DefaultAsyncHttpClient( - new DefaultAsyncHttpClientConfig.Builder() - .setConnectTimeout(connectionTimeout) - .build() - ) -} - -object EmptyPublisher extends Publisher[ByteBuffer] { - override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = { - s.onComplete() - } -} -- cgit v1.2.3