aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-handler
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-09-14 11:03:21 +0100
committeradamw <adam@warski.org>2017-09-14 11:03:21 +0100
commitfbc71ee712635ed64c50ca694735a84ec794eb11 (patch)
treebf1dd7335306b7f320262d45d0d5b6d02f5a0b27 /async-http-client-handler
parenta971d409cb1063a2089d936abf3d3ab70bbbabb6 (diff)
downloadsttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.gz
sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.tar.bz2
sttp-fbc71ee712635ed64c50ca694735a84ec794eb11.zip
Renaming "handler" to "backend"
Diffstat (limited to 'async-http-client-handler')
-rw-r--r--async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala82
-rw-r--r--async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala112
-rw-r--r--async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala76
-rw-r--r--async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala113
-rw-r--r--async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala79
-rw-r--r--async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala313
6 files changed, 0 insertions, 775 deletions
diff --git a/async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala b/async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala
deleted file mode 100644
index fd4d88d..0000000
--- a/async-http-client-handler/cats/src/main/scala/com/softwaremill/sttp/asynchttpclient/cats/AsyncHttpClientCatsHandler.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.softwaremill.sttp.asynchttpclient.cats
-
-import java.nio.ByteBuffer
-
-import cats.effect._
-import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
-import com.softwaremill.sttp.{
- FollowRedirectsHandler,
- MonadAsyncError,
- SttpHandler
-}
-import org.asynchttpclient.{
- AsyncHttpClient,
- AsyncHttpClientConfig,
- DefaultAsyncHttpClient
-}
-import org.reactivestreams.Publisher
-
-import scala.concurrent.duration.FiniteDuration
-import scala.language.higherKinds
-
-class AsyncHttpClientCatsHandler[F[_]: Async] private (
- asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean
-) extends AsyncHttpClientHandler[F, Nothing](
- asyncHttpClient,
- new AsyncMonad,
- closeClient
- ) {
- override protected def streamBodyToPublisher(
- s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
-
- override protected def publisherToStreamBody(
- p: Publisher[ByteBuffer]): Nothing =
- throw new IllegalStateException("This handler does not support streaming")
-
- override protected def publisherToString(
- p: Publisher[ByteBuffer]): F[String] =
- throw new IllegalStateException("This handler does not support streaming")
-}
-
-object AsyncHttpClientCatsHandler {
-
- private def apply[F[_]: Async](
- asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean): SttpHandler[F, Nothing] =
- new FollowRedirectsHandler[F, Nothing](
- new AsyncHttpClientCatsHandler(asyncHttpClient, closeClient))
-
- def apply[F[_]: Async](
- connectionTimeout: FiniteDuration = SttpHandler.DefaultConnectionTimeout)
- : SttpHandler[F, Nothing] =
- AsyncHttpClientCatsHandler(
- AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt),
- closeClient = true)
-
- def usingConfig[F[_]: Async](
- cfg: AsyncHttpClientConfig): SttpHandler[F, Nothing] =
- AsyncHttpClientCatsHandler(new DefaultAsyncHttpClient(cfg),
- closeClient = true)
-
- def usingClient[F[_]: Async](
- client: AsyncHttpClient): SttpHandler[F, Nothing] =
- AsyncHttpClientCatsHandler(client, closeClient = false)
-}
-
-private[cats] class AsyncMonad[F[_]](implicit F: Async[F])
- extends MonadAsyncError[F] {
-
- override def async[T](
- register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
- F.async(register)
-
- override def unit[T](t: T): F[T] = F.pure(t)
-
- override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f)
-
- override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] =
- F.flatMap(fa)(f)
-
- override def error[T](t: Throwable): F[T] = F.raiseError(t)
-}
diff --git a/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala b/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala
deleted file mode 100644
index 1462951..0000000
--- a/async-http-client-handler/fs2/src/main/scala/com/softwaremill/sttp/asynchttpclient/fs2/AsyncHttpClientFs2Handler.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.softwaremill.sttp.asynchttpclient.fs2
-
-import java.nio.ByteBuffer
-
-import cats.effect._
-import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
-import com.softwaremill.sttp.{
- FollowRedirectsHandler,
- MonadAsyncError,
- SttpHandler,
- Utf8,
- concatByteBuffers
-}
-import fs2._
-import fs2.interop.reactivestreams._
-import org.asynchttpclient.{
- AsyncHttpClient,
- AsyncHttpClientConfig,
- DefaultAsyncHttpClient
-}
-import org.reactivestreams.Publisher
-
-import scala.concurrent.ExecutionContext
-import scala.concurrent.duration.FiniteDuration
-import scala.language.higherKinds
-
-class AsyncHttpClientFs2Handler[F[_]: Effect] private (
- asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean)(implicit ec: ExecutionContext)
- extends AsyncHttpClientHandler[F, Stream[F, ByteBuffer]](
- asyncHttpClient,
- new EffectMonad,
- closeClient
- ) {
-
- override protected def streamBodyToPublisher(
- s: Stream[F, ByteBuffer]): Publisher[ByteBuffer] =
- s.toUnicastPublisher
-
- override protected def publisherToStreamBody(
- p: Publisher[ByteBuffer]): Stream[F, ByteBuffer] =
- p.toStream[F]
-
- override protected def publisherToString(
- p: Publisher[ByteBuffer]): F[String] = {
- val bytes = p
- .toStream[F]
- .runFold(ByteBuffer.allocate(0))(concatByteBuffers)
-
- implicitly[Effect[F]].map(bytes)(bb => new String(bb.array(), Utf8))
- }
-}
-
-object AsyncHttpClientFs2Handler {
-
- private def apply[F[_]: Effect](asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean)(
- implicit ec: ExecutionContext): SttpHandler[F, Stream[F, ByteBuffer]] =
- new FollowRedirectsHandler(
- new AsyncHttpClientFs2Handler(asyncHttpClient, closeClient))
-
- /**
- * @param ec The execution context for running non-network related operations,
- * e.g. mapping responses. Defaults to the global execution
- * context.
- */
- def apply[F[_]: Effect](connectionTimeout: FiniteDuration =
- SttpHandler.DefaultConnectionTimeout)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpHandler[F, Stream[F, ByteBuffer]] =
- AsyncHttpClientFs2Handler[F](
- AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt),
- closeClient = true)
-
- /**
- * @param ec The execution context for running non-network related operations,
- * e.g. mapping responses. Defaults to the global execution
- * context.
- */
- def usingConfig[F[_]: Effect](cfg: AsyncHttpClientConfig)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpHandler[F, Stream[F, ByteBuffer]] =
- AsyncHttpClientFs2Handler[F](new DefaultAsyncHttpClient(cfg),
- closeClient = true)
-
- /**
- * @param ec The execution context for running non-network related operations,
- * e.g. mapping responses. Defaults to the global execution
- * context.
- */
- def usingClient[F[_]: Effect](client: AsyncHttpClient)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpHandler[F, Stream[F, ByteBuffer]] =
- AsyncHttpClientFs2Handler[F](client, closeClient = false)
-}
-
-private[fs2] class EffectMonad[F[_]](implicit F: Effect[F])
- extends MonadAsyncError[F] {
-
- override def async[T](
- register: ((Either[Throwable, T]) => Unit) => Unit): F[T] =
- F.async(register)
-
- override def unit[T](t: T): F[T] = F.pure(t)
-
- override def map[T, T2](fa: F[T])(f: (T) => T2): F[T2] = F.map(fa)(f)
-
- override def flatMap[T, T2](fa: F[T])(f: (T) => F[T2]): F[T2] =
- F.flatMap(fa)(f)
-
- override def error[T](t: Throwable): F[T] = F.raiseError(t)
-}
diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala
deleted file mode 100644
index 371b23c..0000000
--- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/AsyncHttpClientFutureHandler.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.softwaremill.sttp.asynchttpclient.future
-
-import java.nio.ByteBuffer
-
-import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
-import com.softwaremill.sttp.{FollowRedirectsHandler, FutureMonad, SttpHandler}
-import org.asynchttpclient.{
- AsyncHttpClient,
- AsyncHttpClientConfig,
- DefaultAsyncHttpClient
-}
-import org.reactivestreams.Publisher
-
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsyncHttpClientFutureHandler private (
- asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean)(implicit ec: ExecutionContext)
- extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient,
- new FutureMonad,
- closeClient) {
-
- override protected def streamBodyToPublisher(
- s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
-
- override protected def publisherToStreamBody(
- p: Publisher[ByteBuffer]): Nothing =
- throw new IllegalStateException("This handler does not support streaming")
-
- override protected def publisherToString(
- p: Publisher[ByteBuffer]): Future[String] =
- throw new IllegalStateException("This handler does not support streaming")
-}
-
-object AsyncHttpClientFutureHandler {
-
- private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(
- implicit ec: ExecutionContext): SttpHandler[Future, Nothing] =
- new FollowRedirectsHandler[Future, Nothing](
- new AsyncHttpClientFutureHandler(asyncHttpClient, closeClient))
-
- /**
- * @param ec The execution context for running non-network related operations,
- * e.g. mapping responses. Defaults to the global execution
- * context.
- */
- def apply(connectionTimeout: FiniteDuration =
- SttpHandler.DefaultConnectionTimeout)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpHandler[Future, Nothing] =
- AsyncHttpClientFutureHandler(
- AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt),
- closeClient = true)
-
- /**
- * @param ec The execution context for running non-network related operations,
- * e.g. mapping responses. Defaults to the global execution
- * context.
- */
- def usingConfig(cfg: AsyncHttpClientConfig)(
- implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpHandler[Future, Nothing] =
- AsyncHttpClientFutureHandler(new DefaultAsyncHttpClient(cfg),
- closeClient = true)
-
- /**
- * @param ec The execution context for running non-network related operations,
- * e.g. mapping responses. Defaults to the global execution
- * context.
- */
- def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext =
- ExecutionContext.Implicits.global)
- : SttpHandler[Future, Nothing] =
- AsyncHttpClientFutureHandler(client, closeClient = false)
-}
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala
deleted file mode 100644
index f01725f..0000000
--- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/AsyncHttpClientMonixHandler.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-package com.softwaremill.sttp.asynchttpclient.monix
-
-import java.nio.ByteBuffer
-
-import com.softwaremill.sttp.{
- FollowRedirectsHandler,
- MonadAsyncError,
- SttpHandler,
- Utf8,
- concatByteBuffers
-}
-import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
-import monix.eval.Task
-import monix.execution.{Cancelable, Scheduler}
-import monix.reactive.Observable
-import org.asynchttpclient.{
- AsyncHttpClient,
- AsyncHttpClientConfig,
- DefaultAsyncHttpClient
-}
-import org.reactivestreams.Publisher
-
-import scala.concurrent.duration.FiniteDuration
-import scala.util.{Failure, Success}
-
-class AsyncHttpClientMonixHandler private (
- asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean)(implicit scheduler: Scheduler)
- extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]](
- asyncHttpClient,
- TaskMonad,
- closeClient) {
-
- override protected def streamBodyToPublisher(
- s: Observable[ByteBuffer]): Publisher[ByteBuffer] =
- s.toReactivePublisher
-
- override protected def publisherToStreamBody(
- p: Publisher[ByteBuffer]): Observable[ByteBuffer] =
- Observable.fromReactivePublisher(p)
-
- override protected def publisherToString(
- p: Publisher[ByteBuffer]): Task[String] = {
-
- val bytes = Observable
- .fromReactivePublisher(p)
- .foldLeftL(ByteBuffer.allocate(0))(concatByteBuffers)
-
- bytes.map(bb => new String(bb.array(), Utf8))
- }
-}
-
-object AsyncHttpClientMonixHandler {
-
- private def apply(asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(
- implicit scheduler: Scheduler)
- : SttpHandler[Task, Observable[ByteBuffer]] =
- new FollowRedirectsHandler(
- new AsyncHttpClientMonixHandler(asyncHttpClient, closeClient))
-
- /**
- * @param s The scheduler used for streaming request bodies. Defaults to the
- * global scheduler.
- */
- def apply(connectionTimeout: FiniteDuration =
- SttpHandler.DefaultConnectionTimeout)(
- implicit s: Scheduler = Scheduler.Implicits.global)
- : SttpHandler[Task, Observable[ByteBuffer]] =
- AsyncHttpClientMonixHandler(
- AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt),
- closeClient = true)
-
- /**
- * @param s The scheduler used for streaming request bodies. Defaults to the
- * global scheduler.
- */
- def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler =
- Scheduler.Implicits.global)
- : SttpHandler[Task, Observable[ByteBuffer]] =
- AsyncHttpClientMonixHandler(new DefaultAsyncHttpClient(cfg),
- closeClient = true)
-
- /**
- * @param s The scheduler used for streaming request bodies. Defaults to the
- * global scheduler.
- */
- def usingClient(client: AsyncHttpClient)(implicit s: Scheduler =
- Scheduler.Implicits.global)
- : SttpHandler[Task, Observable[ByteBuffer]] =
- AsyncHttpClientMonixHandler(client, closeClient = false)
-}
-
-private[monix] object TaskMonad extends MonadAsyncError[Task] {
- override def unit[T](t: T): Task[T] = Task.now(t)
-
- override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
-
- override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
- fa.flatMap(f)
-
- override def async[T](
- register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
- Task.async { (_, cb) =>
- register {
- case Left(t) => cb(Failure(t))
- case Right(t) => cb(Success(t))
- }
-
- Cancelable.empty
- }
-
- override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
-}
diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala
deleted file mode 100644
index 2ba00d7..0000000
--- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/AsyncHttpClientScalazHandler.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.softwaremill.sttp.asynchttpclient.scalaz
-
-import java.nio.ByteBuffer
-
-import com.softwaremill.sttp.{
- FollowRedirectsHandler,
- MonadAsyncError,
- SttpHandler
-}
-import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
-import org.asynchttpclient.{
- AsyncHttpClient,
- AsyncHttpClientConfig,
- DefaultAsyncHttpClient
-}
-import org.reactivestreams.Publisher
-
-import scala.concurrent.duration.FiniteDuration
-import scalaz.{-\/, \/-}
-import scalaz.concurrent.Task
-
-class AsyncHttpClientScalazHandler private (asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean)
- extends AsyncHttpClientHandler[Task, Nothing](asyncHttpClient,
- TaskMonad,
- closeClient) {
-
- override protected def streamBodyToPublisher(
- s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
-
- override protected def publisherToStreamBody(
- p: Publisher[ByteBuffer]): Nothing =
- throw new IllegalStateException("This handler does not support streaming")
-
- override protected def publisherToString(
- p: Publisher[ByteBuffer]): Task[String] =
- throw new IllegalStateException("This handler does not support streaming")
-}
-
-object AsyncHttpClientScalazHandler {
- private def apply(asyncHttpClient: AsyncHttpClient,
- closeClient: Boolean): SttpHandler[Task, Nothing] =
- new FollowRedirectsHandler[Task, Nothing](
- new AsyncHttpClientScalazHandler(asyncHttpClient, closeClient))
-
- def apply(
- connectionTimeout: FiniteDuration = SttpHandler.DefaultConnectionTimeout)
- : SttpHandler[Task, Nothing] =
- AsyncHttpClientScalazHandler(
- AsyncHttpClientHandler.defaultClient(connectionTimeout.toMillis.toInt),
- closeClient = true)
-
- def usingConfig(cfg: AsyncHttpClientConfig): SttpHandler[Task, Nothing] =
- AsyncHttpClientScalazHandler(new DefaultAsyncHttpClient(cfg),
- closeClient = true)
-
- def usingClient(client: AsyncHttpClient): SttpHandler[Task, Nothing] =
- AsyncHttpClientScalazHandler(client, closeClient = false)
-}
-
-private[scalaz] object TaskMonad extends MonadAsyncError[Task] {
- override def unit[T](t: T): Task[T] = Task.point(t)
-
- override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
-
- override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
- fa.flatMap(f)
-
- override def async[T](
- register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
- Task.async { cb =>
- register {
- case Left(t) => cb(-\/(t))
- case Right(t) => cb(\/-(t))
- }
- }
-
- override def error[T](t: Throwable): Task[T] = Task.fail(t)
-}
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
deleted file mode 100644
index 8b6d49c..0000000
--- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
+++ /dev/null
@@ -1,313 +0,0 @@
-package com.softwaremill.sttp.asynchttpclient
-
-import java.io.ByteArrayOutputStream
-import java.nio.ByteBuffer
-import java.nio.charset.Charset
-
-import com.softwaremill.sttp.ResponseAs.EagerResponseHandler
-import com.softwaremill.sttp._
-import org.asynchttpclient.AsyncHandler.State
-import org.asynchttpclient.handler.StreamedAsyncHandler
-import org.asynchttpclient.request.body.multipart.{
- ByteArrayPart,
- FilePart,
- StringPart
-}
-import org.asynchttpclient.{
- AsyncCompletionHandler,
- AsyncHandler,
- AsyncHttpClient,
- DefaultAsyncHttpClientConfig,
- DefaultAsyncHttpClient,
- HttpResponseBodyPart,
- HttpResponseHeaders,
- HttpResponseStatus,
- Param,
- RequestBuilder,
- Request => AsyncRequest,
- Response => AsyncResponse
-}
-import org.reactivestreams.{Publisher, Subscriber, Subscription}
-
-import scala.collection.JavaConverters._
-import scala.language.higherKinds
-import scala.util.{Failure, Try}
-
-abstract class AsyncHttpClientHandler[R[_], S](asyncHttpClient: AsyncHttpClient,
- rm: MonadAsyncError[R],
- closeClient: Boolean)
- extends SttpHandler[R, S] {
-
- override def send[T](r: Request[T, S]): R[Response[T]] = {
- val preparedRequest = asyncHttpClient
- .prepareRequest(requestToAsync(r))
-
- rm.flatten(rm.async[R[Response[T]]] { cb =>
- def success(r: R[Response[T]]) = cb(Right(r))
- def error(t: Throwable) = cb(Left(t))
-
- r.response match {
- case ras @ ResponseAsStream() =>
- preparedRequest
- .execute(streamingAsyncHandler(ras, success, error))
-
- case ra =>
- preparedRequest
- .execute(eagerAsyncHandler(ra, success, error))
- }
- })
- }
-
- override def responseMonad: MonadError[R] = rm
-
- protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer]
-
- protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S
-
- protected def publisherToString(p: Publisher[ByteBuffer]): R[String]
-
- private def eagerAsyncHandler[T](
- responseAs: ResponseAs[T, S],
- success: R[Response[T]] => Unit,
- error: Throwable => Unit): AsyncHandler[Unit] = {
-
- new AsyncCompletionHandler[Unit] {
- override def onCompleted(response: AsyncResponse): Unit =
- success(readEagerResponse(response, responseAs))
-
- override def onThrowable(t: Throwable): Unit = error(t)
- }
- }
-
- private def streamingAsyncHandler[T](
- responseAs: ResponseAsStream[T, S],
- success: R[Response[T]] => Unit,
- error: Throwable => Unit): AsyncHandler[Unit] = {
- new StreamedAsyncHandler[Unit] {
- private val builder = new AsyncResponse.ResponseBuilder()
- private var publisher: Option[Publisher[ByteBuffer]] = None
- private var completed = false
-
- override def onStream(
- p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = {
- // Sadly we don't have .map on Publisher
- publisher = Some(new Publisher[ByteBuffer] {
- override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit =
- p.subscribe(new Subscriber[HttpResponseBodyPart] {
- override def onError(t: Throwable): Unit = s.onError(t)
- override def onComplete(): Unit = s.onComplete()
- override def onNext(t: HttpResponseBodyPart): Unit =
- s.onNext(t.getBodyByteBuffer)
- override def onSubscribe(v: Subscription): Unit =
- s.onSubscribe(v)
- })
- })
- // #2: sometimes onCompleted() isn't called, only onStream(); this
- // seems to be true esp for https sites. For these cases, completing
- // the request here.
- doComplete()
- State.CONTINUE
- }
-
- override def onBodyPartReceived(
- bodyPart: HttpResponseBodyPart): AsyncHandler.State =
- throw new IllegalStateException(
- "Requested a streaming handler, unexpected eager body parts.")
-
- override def onHeadersReceived(
- headers: HttpResponseHeaders): AsyncHandler.State = {
- builder.accumulate(headers)
- State.CONTINUE
- }
-
- override def onStatusReceived(
- responseStatus: HttpResponseStatus): AsyncHandler.State = {
- builder.accumulate(responseStatus)
- State.CONTINUE
- }
-
- override def onCompleted(): Unit = {
- // if the request had no body, onStream() will never be called
- doComplete()
- }
-
- private def doComplete(): Unit = {
- if (!completed) {
- completed = true
-
- val baseResponse = readResponseNoBody(builder.build())
- val p = publisher.getOrElse(EmptyPublisher)
- val s = publisherToStreamBody(p)
- val b = if (codeIsSuccess(baseResponse.code)) {
- rm.unit(Right(responseAs.responseIsStream(s)))
- } else {
- rm.map(publisherToString(p))(Left(_))
- }
-
- success(rm.map(b) { bb: Either[String, T] =>
- baseResponse.copy(body = bb)
- })
- }
- }
-
- override def onThrowable(t: Throwable): Unit = {
- error(t)
- }
- }
- }
-
- private def requestToAsync(r: Request[_, S]): AsyncRequest = {
- val readTimeout = r.options.readTimeout
- val rb = new RequestBuilder(r.method.m)
- .setUrl(r.uri.toString)
- .setRequestTimeout(
- if (readTimeout.isFinite()) readTimeout.toMillis.toInt else -1)
- r.headers.foreach { case (k, v) => rb.setHeader(k, v) }
- setBody(r, r.body, rb)
- rb.build()
- }
-
- private def setBody(r: Request[_, S],
- body: RequestBody[S],
- rb: RequestBuilder): Unit = {
- body match {
- case NoBody => // skip
-
- case StringBody(b, encoding, _) =>
- rb.setBody(b.getBytes(encoding))
-
- case ByteArrayBody(b, _) =>
- rb.setBody(b)
-
- case ByteBufferBody(b, _) =>
- rb.setBody(b)
-
- case InputStreamBody(b, _) =>
- rb.setBody(b)
-
- case PathBody(b, _) =>
- rb.setBody(b.toFile)
-
- case StreamBody(s) =>
- val cl = r.headers
- .find(_._1.equalsIgnoreCase(ContentLengthHeader))
- .map(_._2.toLong)
- .getOrElse(-1L)
- rb.setBody(streamBodyToPublisher(s), cl)
-
- case MultipartBody(ps) =>
- ps.foreach(addMultipartBody(rb, _))
- }
- }
-
- private def addMultipartBody(rb: RequestBuilder, mp: Multipart): Unit = {
- // async http client only supports setting file names on file parts. To
- // set a file name on an arbitrary part we have to use a small "work
- // around", combining the file name with the name (surrounding quotes
- // are added by ahc).
- def nameWithFilename = mp.fileName.fold(mp.name) { fn =>
- s"""${mp.name}"; filename="$fn"""
- }
-
- val bodyPart = mp.body match {
- case StringBody(b, encoding, _) =>
- new StringPart(nameWithFilename,
- b,
- mp.contentType.getOrElse(TextPlainContentType),
- Charset.forName(encoding))
- case ByteArrayBody(b, _) =>
- new ByteArrayPart(nameWithFilename, b)
- case ByteBufferBody(b, _) =>
- new ByteArrayPart(nameWithFilename, b.array())
- case InputStreamBody(b, _) =>
- // sadly async http client only supports parts that are strings,
- // byte arrays or files
- val baos = new ByteArrayOutputStream()
- transfer(b, baos)
- new ByteArrayPart(nameWithFilename, baos.toByteArray)
- case PathBody(b, _) =>
- new FilePart(mp.name, b.toFile, null, null, mp.fileName.orNull)
- }
-
- bodyPart.setCustomHeaders(
- mp.additionalHeaders.map(h => new Param(h._1, h._2)).toList.asJava)
-
- rb.addBodyPart(bodyPart)
- }
-
- private def readEagerResponse[T](
- response: AsyncResponse,
- responseAs: ResponseAs[T, S]): R[Response[T]] = {
- val base = readResponseNoBody(response)
-
- val body = if (codeIsSuccess(base.code)) {
- rm.map(eagerResponseHandler(response).handle(responseAs, rm))(Right(_))
- } else {
- rm.map(eagerResponseHandler(response).handle(asString, rm))(Left(_))
- }
-
- rm.map(body) { b: Either[String, T] =>
- base.copy(body = b)
- }
- }
-
- private def readResponseNoBody(response: AsyncResponse): Response[Unit] = {
- Response(Right(()),
- response.getStatusCode,
- response.getHeaders
- .iterator()
- .asScala
- .map(e => (e.getKey, e.getValue))
- .toList,
- Nil)
- }
-
- private def eagerResponseHandler(response: AsyncResponse) =
- new EagerResponseHandler[S] {
- override def handleBasic[T](bra: BasicResponseAs[T, S]): Try[T] =
- bra match {
- case IgnoreResponse =>
- // getting the body and discarding it
- response.getResponseBodyAsBytes
- Try(())
-
- case ResponseAsString(enc) =>
- Try(response.getResponseBody(Charset.forName(enc)))
-
- case ResponseAsByteArray =>
- Try(response.getResponseBodyAsBytes)
-
- case ResponseAsStream() =>
- Failure(
- new IllegalStateException(
- "Requested a streaming response, trying to read eagerly."))
-
- case ResponseAsFile(file, overwrite) =>
- Try(
- ResponseAs
- .saveFile(file, response.getResponseBodyAsStream, overwrite))
- }
- }
-
- override def close(): Unit = {
- if (closeClient)
- asyncHttpClient.close()
- }
-}
-
-object AsyncHttpClientHandler {
-
- private[asynchttpclient] def defaultClient(
- connectionTimeout: Int): AsyncHttpClient =
- new DefaultAsyncHttpClient(
- new DefaultAsyncHttpClientConfig.Builder()
- .setConnectTimeout(connectionTimeout)
- .build()
- )
-}
-
-object EmptyPublisher extends Publisher[ByteBuffer] {
- override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {
- s.onComplete()
- }
-}