diff options
author | adamw <adam@warski.org> | 2017-07-24 16:57:51 +0200 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-07-24 16:57:51 +0200 |
commit | b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6 (patch) | |
tree | 6ef95abd69930cabb5b7566507af6dc56d25ebaf /async-http-client-handler/src | |
parent | 95fee5083274bf0e856af8b868702f8965b92f1a (diff) | |
download | sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.gz sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.bz2 sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.zip |
Adding streaming to the monix async http client handler
Diffstat (limited to 'async-http-client-handler/src')
-rw-r--r-- | async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala | 181 |
1 files changed, 146 insertions, 35 deletions
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index 1683c3d..f89c85a 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -1,46 +1,143 @@ package com.softwaremill.sttp.asynchttpclient +import java.nio.ByteBuffer import java.nio.charset.Charset import com.softwaremill.sttp.model._ -import com.softwaremill.sttp.{Request, Response, SttpHandler} +import com.softwaremill.sttp.{ + Request, + Response, + SttpHandler, + ContentLengthHeader +} +import org.asynchttpclient.AsyncHandler.State +import org.asynchttpclient.handler.StreamedAsyncHandler import org.asynchttpclient.{ AsyncCompletionHandler, + AsyncHandler, AsyncHttpClient, + HttpResponseBodyPart, + HttpResponseHeaders, + HttpResponseStatus, RequestBuilder, Request => AsyncRequest, Response => AsyncResponse } +import org.reactivestreams.{Publisher, Subscriber, Subscription} import scala.collection.JavaConverters._ import scala.language.higherKinds -class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, - rm: MonadAsyncError[R]) - extends SttpHandler[R, Nothing] { +abstract class AsyncHttpClientHandler[R[_], S]( + asyncHttpClient: AsyncHttpClient, + rm: MonadAsyncError[R]) + extends SttpHandler[R, S] { + + override def send[T](r: Request[T, S]): R[Response[T]] = { + val preparedRequest = asyncHttpClient + .prepareRequest(requestToAsync(r)) - override def send[T](r: Request[T, Nothing]): R[Response[T]] = { rm.flatten(rm.async[R[Response[T]]] { cb => - asyncHttpClient - .prepareRequest(requestToAsync(r)) - .execute(new AsyncCompletionHandler[AsyncResponse] { - override def onCompleted(response: AsyncResponse): AsyncResponse = { - cb(Right(readResponse(response, r.responseAs))) - response - } - override def onThrowable(t: Throwable): Unit = cb(Left(t)) - }) + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) + + r.responseAs match { + case ras @ ResponseAsStream() => + preparedRequest + .execute(streamingAsyncHandler(ras, success, error)) + + case ra => + preparedRequest + .execute(eagerAsyncHandler(ra, success, error)) + } + }) + } - private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = { + protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer] + + protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S + + private def eagerAsyncHandler[T]( + responseAs: ResponseAs[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { + + new AsyncCompletionHandler[Unit] { + override def onCompleted(response: AsyncResponse): Unit = + success(readEagerResponse(response, responseAs)) + + override def onThrowable(t: Throwable): Unit = error(t) + } + } + + private def streamingAsyncHandler[T]( + responseAs: ResponseAsStream[T, S], + success: R[Response[T]] => Unit, + error: Throwable => Unit): AsyncHandler[Unit] = { + new StreamedAsyncHandler[Unit] { + private val builder = new AsyncResponse.ResponseBuilder() + private var publisher: Option[Publisher[ByteBuffer]] = None + + override def onStream( + p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = { + // Sadly we don't have .map on Publisher + publisher = Some(new Publisher[ByteBuffer] { + override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = + p.subscribe(new Subscriber[HttpResponseBodyPart] { + override def onError(t: Throwable): Unit = s.onError(t) + override def onComplete(): Unit = s.onComplete() + override def onNext(t: HttpResponseBodyPart): Unit = + s.onNext(t.getBodyByteBuffer) + override def onSubscribe(v: Subscription): Unit = + s.onSubscribe(v) + }) + }) + State.CONTINUE + } + + override def onBodyPartReceived( + bodyPart: HttpResponseBodyPart): AsyncHandler.State = + throw new IllegalStateException( + "Requested a streaming handler, unexpected eager body parts.") + + override def onHeadersReceived( + headers: HttpResponseHeaders): AsyncHandler.State = { + builder.accumulate(headers) + State.CONTINUE + } + + override def onStatusReceived( + responseStatus: HttpResponseStatus): AsyncHandler.State = { + builder.accumulate(responseStatus) + State.CONTINUE + } + + override def onCompleted(): Unit = { + val baseResponse = readResponseNoBody(builder.build()) + val p = publisher.getOrElse(EmptyPublisher) + val s = publisherToStreamBody(p) + val t = responseAs.responseIsStream(s) + success(rm.unit(baseResponse.copy(body = t))) + } + + override def onThrowable(t: Throwable): Unit = { + error(t) + } + } + } + + private def requestToAsync(r: Request[_, S]): AsyncRequest = { val rb = new RequestBuilder(r.method.m).setUrl(r.uri.toString) r.headers.foreach { case (k, v) => rb.setHeader(k, v) } - setBody(r.body, rb) + setBody(r, r.body, rb) rb.build() } - private def setBody(body: RequestBody[Nothing], rb: RequestBuilder): Unit = { + private def setBody(r: Request[_, S], + body: RequestBody[S], + rb: RequestBuilder): Unit = { body match { case NoBody => // skip @@ -60,36 +157,42 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, rb.setBody(b.toFile) case SerializableBody(f, t) => - setBody(f(t), rb) + setBody(r, f(t), rb) case StreamBody(s) => - // we have an instance of nothing - everything's possible! - s + val cl = r.headers + .find(_._1.equalsIgnoreCase(ContentLengthHeader)) + .map(_._2.toLong) + .getOrElse(-1L) + rb.setBody(streamBodyToPublisher(s), cl) } } - private def readResponse[T]( + private def readEagerResponse[T]( response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): R[Response[T]] = { - val body = readResponseBody(response, responseAs) - rm.map(body, - Response(_: T, - response.getStatusCode, - response.getHeaders - .iterator() - .asScala - .map(e => (e.getKey, e.getValue)) - .toList)) + responseAs: ResponseAs[T, S]): R[Response[T]] = { + val body = readEagerResponseBody(response, responseAs) + rm.map(body, (b: T) => readResponseNoBody(response).copy(body = b)) } - private def readResponseBody[T](response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): R[T] = { + private def readResponseNoBody(response: AsyncResponse): Response[Unit] = { + Response((), + response.getStatusCode, + response.getHeaders + .iterator() + .asScala + .map(e => (e.getKey, e.getValue)) + .toList) + } + + private def readEagerResponseBody[T](response: AsyncResponse, + responseAs: ResponseAs[T, S]): R[T] = { def asString(enc: String) = response.getResponseBody(Charset.forName(enc)) responseAs match { case MappedResponseAs(raw, g) => - rm.map(readResponseBody(response, raw), g) + rm.map(readEagerResponseBody(response, raw), g) case IgnoreResponse => // getting the body and discarding it @@ -108,7 +211,9 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, case ResponseAsStream() => // only possible when the user requests the response as a stream of // Nothing. Oh well ... - rm.error(new IllegalStateException()) + rm.error( + new IllegalStateException( + "Requested a streaming response, trying to read eagerly.")) } } } @@ -122,3 +227,9 @@ trait MonadAsyncError[R[_]] { def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity) } + +object EmptyPublisher extends Publisher[ByteBuffer] { + override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = { + s.onComplete() + } +} |