aboutsummaryrefslogtreecommitdiff
path: root/async-http-client-handler
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-07-24 12:18:27 +0200
committeradamw <adam@warski.org>2017-07-24 12:18:27 +0200
commitccd2c4b1d53bf68e04ff1f8bca032d870494d9a8 (patch)
treee298b14664b07dc9aab54f74abe956fb797fe1bb /async-http-client-handler
parentfef16dd2dbd0f53ee7432ab2ff39255279932ac4 (diff)
downloadsttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.gz
sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.bz2
sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.zip
Better responseAs mapping, done on the client thread pool
Diffstat (limited to 'async-http-client-handler')
-rw-r--r--async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala25
-rw-r--r--async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala17
-rw-r--r--async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala17
-rw-r--r--async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala59
4 files changed, 79 insertions, 39 deletions
diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
index 59119d7..41fcf68 100644
--- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
@@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.future
import com.softwaremill.sttp.asynchttpclient.{
AsyncHttpClientHandler,
- WrapperFromAsync
+ MonadAsyncError
}
import org.asynchttpclient.{
AsyncHttpClient,
@@ -10,18 +10,27 @@ import org.asynchttpclient.{
DefaultAsyncHttpClient
}
-import scala.concurrent.{Future, Promise}
+import scala.concurrent.{ExecutionContext, Future, Promise}
-class FutureAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)
- extends AsyncHttpClientHandler[Future](asyncHttpClient, FutureFromAsync) {
+class FutureAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ extends AsyncHttpClientHandler[Future](asyncHttpClient, new FutureMonad()) {
def this() = this(new DefaultAsyncHttpClient())
def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg))
}
-private[asynchttpclient] object FutureFromAsync
- extends WrapperFromAsync[Future] {
- override def apply[T](
+private[future] class FutureMonad(implicit ec: ExecutionContext)
+ extends MonadAsyncError[Future] {
+ override def unit[T](t: T): Future[T] = Future.successful(t)
+
+ override def map[T, T2](fa: Future[T], f: (T) => T2): Future[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Future[T],
+ f: (T) => Future[T2]): Future[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
register: ((Either[Throwable, T]) => Unit) => Unit): Future[T] = {
val p = Promise[T]()
register {
@@ -30,4 +39,6 @@ private[asynchttpclient] object FutureFromAsync
}
p.future
}
+
+ override def error[T](t: Throwable): Future[T] = Future.failed(t)
}
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
index de0c139..30106f2 100644
--- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
@@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.monix
import com.softwaremill.sttp.asynchttpclient.{
AsyncHttpClientHandler,
- WrapperFromAsync
+ MonadAsyncError
}
import monix.eval.Task
import monix.execution.Cancelable
@@ -15,14 +15,21 @@ import org.asynchttpclient.{
import scala.util.{Failure, Success}
class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)
- extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskFromAsync) {
+ extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) {
def this() = this(new DefaultAsyncHttpClient())
def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg))
}
-private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] {
- override def apply[T](
+private[monix] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
Task.async { (_, cb) =>
register {
@@ -32,4 +39,6 @@ private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] {
Cancelable.empty
}
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
}
diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
index ab2e261..57d65c6 100644
--- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
@@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.scalaz
import com.softwaremill.sttp.asynchttpclient.{
AsyncHttpClientHandler,
- WrapperFromAsync
+ MonadAsyncError
}
import org.asynchttpclient.{
AsyncHttpClient,
@@ -14,14 +14,21 @@ import scalaz.{-\/, \/-}
import scalaz.concurrent.Task
class ScalazAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)
- extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskFromAsync) {
+ extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) {
def this() = this(new DefaultAsyncHttpClient())
def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg))
}
-private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] {
- override def apply[T](
+private[scalaz] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.point(t)
+
+ override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
Task.async { cb =>
register {
@@ -29,4 +36,6 @@ private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] {
case Right(t) => cb(\/-(t))
}
}
+
+ override def error[T](t: Throwable): Task[T] = Task.fail(t)
}
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
index 98160c5..1683c3d 100644
--- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
+++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
@@ -16,11 +16,11 @@ import scala.collection.JavaConverters._
import scala.language.higherKinds
class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
- wrapper: WrapperFromAsync[R])
+ rm: MonadAsyncError[R])
extends SttpHandler[R, Nothing] {
override def send[T](r: Request[T, Nothing]): R[Response[T]] = {
- wrapper { cb =>
+ rm.flatten(rm.async[R[Response[T]]] { cb =>
asyncHttpClient
.prepareRequest(requestToAsync(r))
.execute(new AsyncCompletionHandler[AsyncResponse] {
@@ -30,7 +30,7 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
}
override def onThrowable(t: Throwable): Unit = cb(Left(t))
})
- }
+ })
}
private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = {
@@ -70,44 +70,55 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
private def readResponse[T](
response: AsyncResponse,
- responseAs: ResponseAs[T, Nothing]): Response[T] = {
- Response(readResponseBody(response, responseAs),
- response.getStatusCode,
- response.getHeaders
- .iterator()
- .asScala
- .map(e => (e.getKey, e.getValue))
- .toList)
+ responseAs: ResponseAs[T, Nothing]): R[Response[T]] = {
+ val body = readResponseBody(response, responseAs)
+ rm.map(body,
+ Response(_: T,
+ response.getStatusCode,
+ response.getHeaders
+ .iterator()
+ .asScala
+ .map(e => (e.getKey, e.getValue))
+ .toList))
}
private def readResponseBody[T](response: AsyncResponse,
- responseAs: ResponseAs[T, Nothing]): T = {
+ responseAs: ResponseAs[T, Nothing]): R[T] = {
def asString(enc: String) = response.getResponseBody(Charset.forName(enc))
responseAs match {
- case IgnoreResponse(g) =>
+ case MappedResponseAs(raw, g) =>
+ rm.map(readResponseBody(response, raw), g)
+
+ case IgnoreResponse =>
// getting the body and discarding it
response.getResponseBodyAsBytes
- g(())
+ rm.unit(())
- case ResponseAsString(enc, g) =>
- g(asString(enc))
+ case ResponseAsString(enc) =>
+ rm.unit(asString(enc))
- case ResponseAsByteArray(g) =>
- g(response.getResponseBodyAsBytes)
+ case ResponseAsByteArray =>
+ rm.unit(response.getResponseBodyAsBytes)
- case r @ ResponseAsParams(enc, g) =>
- g(r.parse(asString(enc)))
+ case r @ ResponseAsParams(enc) =>
+ rm.unit(r.parse(asString(enc)))
- case ResponseAsStream(_) =>
+ case ResponseAsStream() =>
// only possible when the user requests the response as a stream of
// Nothing. Oh well ...
- throw new IllegalStateException()
+ rm.error(new IllegalStateException())
}
}
}
-trait WrapperFromAsync[R[_]] {
- def apply[T](register: (Either[Throwable, T] => Unit) => Unit): R[T]
+trait MonadAsyncError[R[_]] {
+ def unit[T](t: T): R[T]
+ def map[T, T2](fa: R[T], f: T => T2): R[T2]
+ def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2]
+ def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T]
+ def error[T](t: Throwable): R[T]
+
+ def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity)
}