diff options
author | adamw <adam@warski.org> | 2017-07-24 12:18:27 +0200 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-07-24 12:18:27 +0200 |
commit | ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8 (patch) | |
tree | e298b14664b07dc9aab54f74abe956fb797fe1bb /async-http-client-handler | |
parent | fef16dd2dbd0f53ee7432ab2ff39255279932ac4 (diff) | |
download | sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.gz sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.bz2 sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.zip |
Better responseAs mapping, done on the client thread pool
Diffstat (limited to 'async-http-client-handler')
4 files changed, 79 insertions, 39 deletions
diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala index 59119d7..41fcf68 100644 --- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala +++ b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala @@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.future import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, - WrapperFromAsync + MonadAsyncError } import org.asynchttpclient.{ AsyncHttpClient, @@ -10,18 +10,27 @@ import org.asynchttpclient.{ DefaultAsyncHttpClient } -import scala.concurrent.{Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} -class FutureAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Future](asyncHttpClient, FutureFromAsync) { +class FutureAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + extends AsyncHttpClientHandler[Future](asyncHttpClient, new FutureMonad()) { def this() = this(new DefaultAsyncHttpClient()) def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) } -private[asynchttpclient] object FutureFromAsync - extends WrapperFromAsync[Future] { - override def apply[T]( +private[future] class FutureMonad(implicit ec: ExecutionContext) + extends MonadAsyncError[Future] { + override def unit[T](t: T): Future[T] = Future.successful(t) + + override def map[T, T2](fa: Future[T], f: (T) => T2): Future[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Future[T], + f: (T) => Future[T2]): Future[T2] = + fa.flatMap(f) + + override def async[T]( register: ((Either[Throwable, T]) => Unit) => Unit): Future[T] = { val p = Promise[T]() register { @@ -30,4 +39,6 @@ private[asynchttpclient] object FutureFromAsync } p.future } + + override def error[T](t: Throwable): Future[T] = Future.failed(t) } diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala index de0c139..30106f2 100644 --- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala +++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala @@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.monix import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, - WrapperFromAsync + MonadAsyncError } import monix.eval.Task import monix.execution.Cancelable @@ -15,14 +15,21 @@ import org.asynchttpclient.{ import scala.util.{Failure, Success} class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskFromAsync) { + extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) { def this() = this(new DefaultAsyncHttpClient()) def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) } -private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { - override def apply[T]( +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = Task.async { (_, cb) => register { @@ -32,4 +39,6 @@ private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { Cancelable.empty } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) } diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala index ab2e261..57d65c6 100644 --- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala +++ b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala @@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.scalaz import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, - WrapperFromAsync + MonadAsyncError } import org.asynchttpclient.{ AsyncHttpClient, @@ -14,14 +14,21 @@ import scalaz.{-\/, \/-} import scalaz.concurrent.Task class ScalazAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskFromAsync) { + extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) { def this() = this(new DefaultAsyncHttpClient()) def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) } -private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { - override def apply[T]( +private[scalaz] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.point(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = Task.async { cb => register { @@ -29,4 +36,6 @@ private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { case Right(t) => cb(\/-(t)) } } + + override def error[T](t: Throwable): Task[T] = Task.fail(t) } diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index 98160c5..1683c3d 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -16,11 +16,11 @@ import scala.collection.JavaConverters._ import scala.language.higherKinds class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, - wrapper: WrapperFromAsync[R]) + rm: MonadAsyncError[R]) extends SttpHandler[R, Nothing] { override def send[T](r: Request[T, Nothing]): R[Response[T]] = { - wrapper { cb => + rm.flatten(rm.async[R[Response[T]]] { cb => asyncHttpClient .prepareRequest(requestToAsync(r)) .execute(new AsyncCompletionHandler[AsyncResponse] { @@ -30,7 +30,7 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, } override def onThrowable(t: Throwable): Unit = cb(Left(t)) }) - } + }) } private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = { @@ -70,44 +70,55 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, private def readResponse[T]( response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): Response[T] = { - Response(readResponseBody(response, responseAs), - response.getStatusCode, - response.getHeaders - .iterator() - .asScala - .map(e => (e.getKey, e.getValue)) - .toList) + responseAs: ResponseAs[T, Nothing]): R[Response[T]] = { + val body = readResponseBody(response, responseAs) + rm.map(body, + Response(_: T, + response.getStatusCode, + response.getHeaders + .iterator() + .asScala + .map(e => (e.getKey, e.getValue)) + .toList)) } private def readResponseBody[T](response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): T = { + responseAs: ResponseAs[T, Nothing]): R[T] = { def asString(enc: String) = response.getResponseBody(Charset.forName(enc)) responseAs match { - case IgnoreResponse(g) => + case MappedResponseAs(raw, g) => + rm.map(readResponseBody(response, raw), g) + + case IgnoreResponse => // getting the body and discarding it response.getResponseBodyAsBytes - g(()) + rm.unit(()) - case ResponseAsString(enc, g) => - g(asString(enc)) + case ResponseAsString(enc) => + rm.unit(asString(enc)) - case ResponseAsByteArray(g) => - g(response.getResponseBodyAsBytes) + case ResponseAsByteArray => + rm.unit(response.getResponseBodyAsBytes) - case r @ ResponseAsParams(enc, g) => - g(r.parse(asString(enc))) + case r @ ResponseAsParams(enc) => + rm.unit(r.parse(asString(enc))) - case ResponseAsStream(_) => + case ResponseAsStream() => // only possible when the user requests the response as a stream of // Nothing. Oh well ... - throw new IllegalStateException() + rm.error(new IllegalStateException()) } } } -trait WrapperFromAsync[R[_]] { - def apply[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] +trait MonadAsyncError[R[_]] { + def unit[T](t: T): R[T] + def map[T, T2](fa: R[T], f: T => T2): R[T2] + def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2] + def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] + def error[T](t: Throwable): R[T] + + def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity) } |