From ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8 Mon Sep 17 00:00:00 2001 From: adamw Date: Mon, 24 Jul 2017 12:18:27 +0200 Subject: Better responseAs mapping, done on the client thread pool --- .../asynchttpclient/AsyncHttpClientHandler.scala | 59 +++++++++++++--------- 1 file changed, 35 insertions(+), 24 deletions(-) (limited to 'async-http-client-handler/src') diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index 98160c5..1683c3d 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -16,11 +16,11 @@ import scala.collection.JavaConverters._ import scala.language.higherKinds class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, - wrapper: WrapperFromAsync[R]) + rm: MonadAsyncError[R]) extends SttpHandler[R, Nothing] { override def send[T](r: Request[T, Nothing]): R[Response[T]] = { - wrapper { cb => + rm.flatten(rm.async[R[Response[T]]] { cb => asyncHttpClient .prepareRequest(requestToAsync(r)) .execute(new AsyncCompletionHandler[AsyncResponse] { @@ -30,7 +30,7 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, } override def onThrowable(t: Throwable): Unit = cb(Left(t)) }) - } + }) } private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = { @@ -70,44 +70,55 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, private def readResponse[T]( response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): Response[T] = { - Response(readResponseBody(response, responseAs), - response.getStatusCode, - response.getHeaders - .iterator() - .asScala - .map(e => (e.getKey, e.getValue)) - .toList) + responseAs: ResponseAs[T, Nothing]): R[Response[T]] = { + val body = readResponseBody(response, responseAs) + rm.map(body, + Response(_: T, + response.getStatusCode, + response.getHeaders + .iterator() + .asScala + .map(e => (e.getKey, e.getValue)) + .toList)) } private def readResponseBody[T](response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): T = { + responseAs: ResponseAs[T, Nothing]): R[T] = { def asString(enc: String) = response.getResponseBody(Charset.forName(enc)) responseAs match { - case IgnoreResponse(g) => + case MappedResponseAs(raw, g) => + rm.map(readResponseBody(response, raw), g) + + case IgnoreResponse => // getting the body and discarding it response.getResponseBodyAsBytes - g(()) + rm.unit(()) - case ResponseAsString(enc, g) => - g(asString(enc)) + case ResponseAsString(enc) => + rm.unit(asString(enc)) - case ResponseAsByteArray(g) => - g(response.getResponseBodyAsBytes) + case ResponseAsByteArray => + rm.unit(response.getResponseBodyAsBytes) - case r @ ResponseAsParams(enc, g) => - g(r.parse(asString(enc))) + case r @ ResponseAsParams(enc) => + rm.unit(r.parse(asString(enc))) - case ResponseAsStream(_) => + case ResponseAsStream() => // only possible when the user requests the response as a stream of // Nothing. Oh well ... - throw new IllegalStateException() + rm.error(new IllegalStateException()) } } } -trait WrapperFromAsync[R[_]] { - def apply[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] +trait MonadAsyncError[R[_]] { + def unit[T](t: T): R[T] + def map[T, T2](fa: R[T], f: T => T2): R[T2] + def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2] + def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] + def error[T](t: Throwable): R[T] + + def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity) } -- cgit v1.2.3