aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-handler/src
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-07-24 12:18:27 +0200
committeradamw <adam@warski.org>2017-07-24 12:18:27 +0200
commitccd2c4b1d53bf68e04ff1f8bca032d870494d9a8 (patch)
treee298b14664b07dc9aab54f74abe956fb797fe1bb /async-http-client-handler/src
parentfef16dd2dbd0f53ee7432ab2ff39255279932ac4 (diff)
downloadsttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.gz
sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.bz2
sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.zip
Better responseAs mapping, done on the client thread pool
Diffstat (limited to 'async-http-client-handler/src')
-rw-r--r--async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala59
1 files changed, 35 insertions, 24 deletions
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
index 98160c5..1683c3d 100644
--- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
+++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
@@ -16,11 +16,11 @@ import scala.collection.JavaConverters._
import scala.language.higherKinds
class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
- wrapper: WrapperFromAsync[R])
+ rm: MonadAsyncError[R])
extends SttpHandler[R, Nothing] {
override def send[T](r: Request[T, Nothing]): R[Response[T]] = {
- wrapper { cb =>
+ rm.flatten(rm.async[R[Response[T]]] { cb =>
asyncHttpClient
.prepareRequest(requestToAsync(r))
.execute(new AsyncCompletionHandler[AsyncResponse] {
@@ -30,7 +30,7 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
}
override def onThrowable(t: Throwable): Unit = cb(Left(t))
})
- }
+ })
}
private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = {
@@ -70,44 +70,55 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
private def readResponse[T](
response: AsyncResponse,
- responseAs: ResponseAs[T, Nothing]): Response[T] = {
- Response(readResponseBody(response, responseAs),
- response.getStatusCode,
- response.getHeaders
- .iterator()
- .asScala
- .map(e => (e.getKey, e.getValue))
- .toList)
+ responseAs: ResponseAs[T, Nothing]): R[Response[T]] = {
+ val body = readResponseBody(response, responseAs)
+ rm.map(body,
+ Response(_: T,
+ response.getStatusCode,
+ response.getHeaders
+ .iterator()
+ .asScala
+ .map(e => (e.getKey, e.getValue))
+ .toList))
}
private def readResponseBody[T](response: AsyncResponse,
- responseAs: ResponseAs[T, Nothing]): T = {
+ responseAs: ResponseAs[T, Nothing]): R[T] = {
def asString(enc: String) = response.getResponseBody(Charset.forName(enc))
responseAs match {
- case IgnoreResponse(g) =>
+ case MappedResponseAs(raw, g) =>
+ rm.map(readResponseBody(response, raw), g)
+
+ case IgnoreResponse =>
// getting the body and discarding it
response.getResponseBodyAsBytes
- g(())
+ rm.unit(())
- case ResponseAsString(enc, g) =>
- g(asString(enc))
+ case ResponseAsString(enc) =>
+ rm.unit(asString(enc))
- case ResponseAsByteArray(g) =>
- g(response.getResponseBodyAsBytes)
+ case ResponseAsByteArray =>
+ rm.unit(response.getResponseBodyAsBytes)
- case r @ ResponseAsParams(enc, g) =>
- g(r.parse(asString(enc)))
+ case r @ ResponseAsParams(enc) =>
+ rm.unit(r.parse(asString(enc)))
- case ResponseAsStream(_) =>
+ case ResponseAsStream() =>
// only possible when the user requests the response as a stream of
// Nothing. Oh well ...
- throw new IllegalStateException()
+ rm.error(new IllegalStateException())
}
}
}
-trait WrapperFromAsync[R[_]] {
- def apply[T](register: (Either[Throwable, T] => Unit) => Unit): R[T]
+trait MonadAsyncError[R[_]] {
+ def unit[T](t: T): R[T]
+ def map[T, T2](fa: R[T], f: T => T2): R[T2]
+ def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2]
+ def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T]
+ def error[T](t: Throwable): R[T]
+
+ def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity)
}