From b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6 Mon Sep 17 00:00:00 2001 From: adamw Date: Mon, 24 Jul 2017 16:57:51 +0200 Subject: Adding streaming to the monix async http client handler --- .../future/FutureAsyncHttpClientHandler.scala | 32 +++- .../monix/MonixAsyncHttpClientHandler.scala | 49 +++++- .../scalaz/ScalazAsyncHttpClientHandler.scala | 13 +- .../asynchttpclient/AsyncHttpClientHandler.scala | 181 +++++++++++++++++---- 4 files changed, 230 insertions(+), 45 deletions(-) (limited to 'async-http-client-handler') diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala index adc679e..a2e49a2 100644 --- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala +++ b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala @@ -1,5 +1,7 @@ package com.softwaremill.sttp.asynchttpclient.future +import java.nio.ByteBuffer + import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, MonadAsyncError @@ -9,22 +11,50 @@ import org.asynchttpclient.{ AsyncHttpClientConfig, DefaultAsyncHttpClient } +import org.reactivestreams.Publisher import scala.concurrent.{ExecutionContext, Future, Promise} class FutureAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)( implicit ec: ExecutionContext) - extends AsyncHttpClientHandler[Future](asyncHttpClient, new FutureMonad()) + extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient, + new FutureMonad()) { + + override protected def streamBodyToPublisher( + s: Nothing): Publisher[ByteBuffer] = s // nothing is everything + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Nothing = + throw new IllegalStateException("This handler does not support streaming") +} object FutureAsyncHttpClientHandler { + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ def apply()( implicit ec: ExecutionContext = ExecutionContext.Implicits.global) : FutureAsyncHttpClientHandler = new FutureAsyncHttpClientHandler(new DefaultAsyncHttpClient()) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ def usingConfig(cfg: AsyncHttpClientConfig)( implicit ec: ExecutionContext = ExecutionContext.Implicits.global) : FutureAsyncHttpClientHandler = new FutureAsyncHttpClientHandler(new DefaultAsyncHttpClient()) + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) : FutureAsyncHttpClientHandler = diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala index fab9c98..c77e6d9 100644 --- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala +++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala @@ -1,32 +1,65 @@ package com.softwaremill.sttp.asynchttpclient.monix +import java.nio.ByteBuffer + import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, MonadAsyncError } import monix.eval.Task -import monix.execution.Cancelable +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.Observable import org.asynchttpclient.{ AsyncHttpClient, AsyncHttpClientConfig, DefaultAsyncHttpClient } +import org.reactivestreams.Publisher import scala.util.{Failure, Success} -class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) { +class MonixAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)( + implicit scheduler: Scheduler) + extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]]( + asyncHttpClient, + TaskMonad) { + + override protected def streamBodyToPublisher( + s: Observable[ByteBuffer]): Publisher[ByteBuffer] = { + s.toReactivePublisher + } - def this() = this(new DefaultAsyncHttpClient()) - def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Observable[ByteBuffer] = + Observable.fromReactivePublisher(p) } object MonixAsyncHttpClientHandler { - def apply(): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def apply()(implicit s: Scheduler = Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient()) - def usingConfig(cfg: AsyncHttpClientConfig): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler = + Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient()) - def usingClient(client: AsyncHttpClient): MonixAsyncHttpClientHandler = + + /** + * @param s The scheduler used for streaming request bodies. Defaults to the + * global scheduler. + */ + def usingClient(client: AsyncHttpClient)(implicit s: Scheduler = + Scheduler.Implicits.global) + : MonixAsyncHttpClientHandler = new MonixAsyncHttpClientHandler(client) } diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala index cb3bdef..0460fff 100644 --- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala +++ b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala @@ -1,5 +1,7 @@ package com.softwaremill.sttp.asynchttpclient.scalaz +import java.nio.ByteBuffer + import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, MonadAsyncError @@ -9,12 +11,21 @@ import org.asynchttpclient.{ AsyncHttpClientConfig, DefaultAsyncHttpClient } +import org.reactivestreams.Publisher import scalaz.{-\/, \/-} import scalaz.concurrent.Task class ScalazAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) + extends AsyncHttpClientHandler[Task, Nothing](asyncHttpClient, TaskMonad) { + + override protected def streamBodyToPublisher( + s: Nothing): Publisher[ByteBuffer] = s // nothing is everything + + override protected def publisherToStreamBody( + p: Publisher[ByteBuffer]): Nothing = + throw new IllegalStateException("This handler does not support streaming") +} object ScalazAsyncHttpClientHandler { def apply(): ScalazAsyncHttpClientHandler = diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index 1683c3d..f89c85a 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -1,46 +1,143 @@ package com.softwaremill.sttp.asynchttpclient +import java.nio.ByteBuffer import java.nio.charset.Charset import com.softwaremill.sttp.model._ -import com.softwaremill.sttp.{Request, Response, SttpHandler} +import com.softwaremill.sttp.{ + Request, + Response, + SttpHandler, + ContentLengthHeader +} +import org.asynchttpclient.AsyncHandler.State +import org.asynchttpclient.handler.StreamedAsyncHandler import org.asynchttpclient.{ AsyncCompletionHandler, + AsyncHandler, AsyncHttpClient, + HttpResponseBodyPart, + HttpResponseHeaders, + HttpResponseStatus, RequestBuilder, Request => AsyncRequest, Response => AsyncResponse } +import org.reactivestreams.{Publisher, Subscriber, Subscription} import scala.collection.JavaConverters._ import scala.language.higherKinds -class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, - rm: MonadAsyncError[R]) - extends SttpHandler[R, Nothing] { +abstract class AsyncHttpClientHandler[R[_], S]( + asyncHttpClient: AsyncHttpClient, + rm: MonadAsyncError[R]) + extends SttpHandler[R, S] { + + override def send[T](r: Request[T, S]): R[Response[T]] = { + val preparedRequest = asyncHttpClient + .prepareRequest(requestToAsync(r)) - override def send[T](r: Request[T, Nothing]): R[Response[T]] = { rm.flatten(rm.async[R[Response[T]]] { cb => - asyncHttpClient - .prepareRequest(requestToAsync(r)) - .execute(new AsyncCompletionHandler[AsyncResponse] { - override def onCompleted(response: AsyncResponse): AsyncResponse = { - cb(Right(readResponse(response, r.responseAs))) - response - } - override def onThrowable(t: Throwable): Unit = cb(Left(t)) - }) + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) + + r.responseAs match { + case ras @ ResponseAsStream() => + preparedRequest + .execute(streamingAsyncHandler(ras, success, error)) + + case ra => + preparedRequest + .execute(eagerAsyncHandler(ra, success, error)) + } + }) + } - private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = { + protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer] + + protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S + + private def eagerAsyncHandler[T]( + responseAs: ResponseAs[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { + + new AsyncCompletionHandler[Unit] { + override def onCompleted(response: AsyncResponse): Unit = + success(readEagerResponse(response, responseAs)) + + override def onThrowable(t: Throwable): Unit = error(t) + } + } + + private def streamingAsyncHandler[T]( + responseAs: ResponseAsStream[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { + new StreamedAsyncHandler[Unit] { + private val builder = new AsyncResponse.ResponseBuilder() + private var publisher: Option[Publisher[ByteBuffer]] = None + + override def onStream( + p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { + // Sadly we don't have .map on Publisher + publisher = Some(new Publisher[ByteBuffer] { + override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = + p.subscribe(new Subscriber[HttpResponseBodyPart] { + override def onError(t: Throwable): Unit = s.onError(t) + override def onComplete(): Unit = s.onComplete() + override def onNext(t: HttpResponseBodyPart): Unit = + s.onNext(t.getBodyByteBuffer) + override def onSubscribe(v: Subscription): Unit = + s.onSubscribe(v) + }) + }) + State.CONTINUE + } + + override def onBodyPartReceived( + bodyPart: HttpResponseBodyPart): AsyncHandler.State = + throw new IllegalStateException( + "Requested a streaming handler, unexpected eager body parts.") + + override def onHeadersReceived( + headers: HttpResponseHeaders): AsyncHandler.State = { + builder.accumulate(headers) + State.CONTINUE + } + + override def onStatusReceived( + responseStatus: HttpResponseStatus): AsyncHandler.State = { + builder.accumulate(responseStatus) + State.CONTINUE + } + + override def onCompleted(): Unit = { + val baseResponse = readResponseNoBody(builder.build()) + val p = publisher.getOrElse(EmptyPublisher) + val s = publisherToStreamBody(p) + val t = responseAs.responseIsStream(s) + success(rm.unit(baseResponse.copy(body = t))) + } + + override def onThrowable(t: Throwable): Unit = { + error(t) + } + } + } + + private def requestToAsync(r: Request[_, S]): AsyncRequest = { val rb = new RequestBuilder(r.method.m).setUrl(r.uri.toString) r.headers.foreach { case (k, v) => rb.setHeader(k, v) } - setBody(r.body, rb) + setBody(r, r.body, rb) rb.build() } - private def setBody(body: RequestBody[Nothing], rb: RequestBuilder): Unit = { + private def setBody(r: Request[_, S], + body: RequestBody[S], + rb: RequestBuilder): Unit = { body match { case NoBody => // skip @@ -60,36 +157,42 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, rb.setBody(b.toFile) case SerializableBody(f, t) => - setBody(f(t), rb) + setBody(r, f(t), rb) case StreamBody(s) => - // we have an instance of nothing - everything's possible! - s + val cl = r.headers + .find(_._1.equalsIgnoreCase(ContentLengthHeader)) + .map(_._2.toLong) + .getOrElse(-1L) + rb.setBody(streamBodyToPublisher(s), cl) } } - private def readResponse[T]( + private def readEagerResponse[T]( response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): R[Response[T]] = { - val body = readResponseBody(response, responseAs) - rm.map(body, - Response(_: T, - response.getStatusCode, - response.getHeaders - .iterator() - .asScala - .map(e => (e.getKey, e.getValue)) - .toList)) + responseAs: ResponseAs[T, S]): R[Response[T]] = { + val body = readEagerResponseBody(response, responseAs) + rm.map(body, (b: T) => readResponseNoBody(response).copy(body = b)) } - private def readResponseBody[T](response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): R[T] = { + private def readResponseNoBody(response: AsyncResponse): Response[Unit] = { + Response((), + response.getStatusCode, + response.getHeaders + .iterator() + .asScala + .map(e => (e.getKey, e.getValue)) + .toList) + } + + private def readEagerResponseBody[T](response: AsyncResponse, + responseAs: ResponseAs[T, S]): R[T] = { def asString(enc: String) = response.getResponseBody(Charset.forName(enc)) responseAs match { case MappedResponseAs(raw, g) => - rm.map(readResponseBody(response, raw), g) + rm.map(readEagerResponseBody(response, raw), g) case IgnoreResponse => // getting the body and discarding it @@ -108,7 +211,9 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, case ResponseAsStream() => // only possible when the user requests the response as a stream of // Nothing. Oh well ... - rm.error(new IllegalStateException()) + rm.error( + new IllegalStateException( + "Requested a streaming response, trying to read eagerly.")) } } } @@ -122,3 +227,9 @@ trait MonadAsyncError[R[_]] { def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity) } + +object EmptyPublisher extends Publisher[ByteBuffer] { + override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = { + s.onComplete() + } +} -- cgit v1.2.3