diff options
author | adamw <adam@warski.org> | 2017-08-03 11:38:23 +0200 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-08-03 11:38:23 +0200 |
commit | c07f98349ac5dc646855d00425a9dc2c3324465e (patch) | |
tree | 6f07784a68290b406a3b50a59fac36b83c5df355 | |
parent | 90615deb20ce43f09371b10a25628be8d68485d8 (diff) | |
download | sttp-c07f98349ac5dc646855d00425a9dc2c3324465e.tar.gz sttp-c07f98349ac5dc646855d00425a9dc2c3324465e.tar.bz2 sttp-c07f98349ac5dc646855d00425a9dc2c3324465e.zip |
Making the response monad a top-level concept, to make it possible to write SttpHandler wrappers.
9 files changed, 92 insertions, 69 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index c096aa4..d4ca3d8 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -43,6 +43,8 @@ class AkkaHttpSttpHandler private (actorSystem: ActorSystem, } } + override def responseMonad: MonadError[Future] = new FutureMonad()(ec) + private def methodToAkka(m: Method): HttpMethod = m match { case Method.GET => HttpMethods.GET case Method.HEAD => HttpMethods.HEAD diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala index 5d71511..3209e9b 100644 --- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala +++ b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala @@ -2,11 +2,8 @@ package com.softwaremill.sttp.asynchttpclient.future import java.nio.ByteBuffer -import com.softwaremill.sttp.SttpHandler -import com.softwaremill.sttp.asynchttpclient.{ - AsyncHttpClientHandler, - MonadAsyncError -} +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler +import com.softwaremill.sttp.{FutureMonad, SttpHandler} import org.asynchttpclient.{ AsyncHttpClient, AsyncHttpClientConfig, @@ -14,13 +11,13 @@ import org.asynchttpclient.{ } import org.reactivestreams.Publisher -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} class FutureAsyncHttpClientHandler private ( asyncHttpClient: AsyncHttpClient, closeClient: Boolean)(implicit ec: ExecutionContext) extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient, - new FutureMonad(), + new FutureMonad, closeClient) { override protected def streamBodyToPublisher( @@ -64,25 +61,3 @@ object FutureAsyncHttpClientHandler { : SttpHandler[Future, Nothing] = new FutureAsyncHttpClientHandler(client, closeClient = false) } - -private[future] class FutureMonad(implicit ec: ExecutionContext) - extends MonadAsyncError[Future] { - override def unit[T](t: T): Future[T] = Future.successful(t) - - override def map[T, T2](fa: Future[T], f: (T) => T2): Future[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Future[T], f: (T) => Future[T2]): Future[T2] = - fa.flatMap(f) - - override def async[T]( - register: ((Either[Throwable, T]) => Unit) => Unit): Future[T] = { - val p = Promise[T]() - register { - case Left(t) => p.failure(t) - case Right(t) => p.success(t) - } - p.future - } - - override def error[T](t: Throwable): Future[T] = Future.failed(t) -} diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala index 2357c06..2407c19 100644 --- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala +++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala @@ -2,11 +2,8 @@ package com.softwaremill.sttp.asynchttpclient.monix import java.nio.ByteBuffer -import com.softwaremill.sttp.SttpHandler -import com.softwaremill.sttp.asynchttpclient.{ - AsyncHttpClientHandler, - MonadAsyncError -} +import com.softwaremill.sttp.{MonadAsyncError, SttpHandler} +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler import monix.eval.Task import monix.execution.{Cancelable, Scheduler} import monix.reactive.Observable diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala index ef06a41..01c3cdb 100644 --- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala +++ b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala @@ -2,11 +2,8 @@ package com.softwaremill.sttp.asynchttpclient.scalaz import java.nio.ByteBuffer -import com.softwaremill.sttp.SttpHandler -import com.softwaremill.sttp.asynchttpclient.{ - AsyncHttpClientHandler, - MonadAsyncError -} +import com.softwaremill.sttp.{MonadAsyncError, SttpHandler} +import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler import org.asynchttpclient.{ AsyncHttpClient, AsyncHttpClientConfig, diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index 4824b39..f5b4569 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -5,10 +5,12 @@ import java.nio.charset.Charset import com.softwaremill.sttp.model._ import com.softwaremill.sttp.{ + ContentLengthHeader, + MonadAsyncError, + MonadError, Request, Response, - SttpHandler, - ContentLengthHeader + SttpHandler } import org.asynchttpclient.AsyncHandler.State import org.asynchttpclient.handler.StreamedAsyncHandler @@ -50,11 +52,11 @@ abstract class AsyncHttpClientHandler[R[_], S](asyncHttpClient: AsyncHttpClient, preparedRequest .execute(eagerAsyncHandler(ra, success, error)) } - }) - } + override def responseMonad: MonadError[R] = rm + protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer] protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S @@ -234,16 +236,6 @@ abstract class AsyncHttpClientHandler[R[_], S](asyncHttpClient: AsyncHttpClient, } } -trait MonadAsyncError[R[_]] { - def unit[T](t: T): R[T] - def map[T, T2](fa: R[T], f: T => T2): R[T2] - def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2] - def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] - def error[T](t: Throwable): R[T] - - def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity) -} - object EmptyPublisher extends Publisher[ByteBuffer] { override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = { s.onComplete() diff --git a/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala index 6c3368a..d5c2ccd 100644 --- a/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala +++ b/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala @@ -33,6 +33,8 @@ object HttpURLConnectionSttpHandler extends SttpHandler[Id, Nothing] { } } + override def responseMonad: MonadError[Id] = IdMonad + private def setBody(body: RequestBody[Nothing], c: HttpURLConnection): Unit = { if (body != NoBody) c.setDoOutput(true) diff --git a/core/src/main/scala/com/softwaremill/sttp/MonadError.scala b/core/src/main/scala/com/softwaremill/sttp/MonadError.scala new file mode 100644 index 0000000..e5e7ab8 --- /dev/null +++ b/core/src/main/scala/com/softwaremill/sttp/MonadError.scala @@ -0,0 +1,44 @@ +package com.softwaremill.sttp + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.language.higherKinds + +trait MonadError[R[_]] { + def unit[T](t: T): R[T] + def map[T, T2](fa: R[T], f: T => T2): R[T2] + def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2] + def error[T](t: Throwable): R[T] + + def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity) +} + +trait MonadAsyncError[R[_]] extends MonadError[R] { + def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] +} + +object IdMonad extends MonadError[Id] { + override def unit[T](t: T): Id[T] = t + override def map[T, T2](fa: Id[T], f: (T) => T2): Id[T2] = f(fa) + override def flatMap[T, T2](fa: Id[T], f: (T) => Id[T2]): Id[T2] = f(fa) + override def error[T](t: Throwable): Id[T] = throw t +} + +class FutureMonad(implicit ec: ExecutionContext) + extends MonadAsyncError[Future] { + + override def unit[T](t: T): Future[T] = Future.successful(t) + override def map[T, T2](fa: Future[T], f: (T) => T2): Future[T2] = fa.map(f) + override def flatMap[T, T2](fa: Future[T], f: (T) => Future[T2]): Future[T2] = + fa.flatMap(f) + override def error[T](t: Throwable): Future[T] = Future.failed(t) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Future[T] = { + val p = Promise[T]() + register { + case Left(t) => p.failure(t) + case Right(t) => p.success(t) + } + p.future + } +} diff --git a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala index 800d66a..bdcc1b5 100644 --- a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala +++ b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala @@ -11,4 +11,9 @@ import scala.language.higherKinds trait SttpHandler[R[_], -S] { def send[T](request: Request[T, S]): R[Response[T]] def close(): Unit = {} + /** + * The monad in which the responses are wrapped. Allows writing wrapper + * handlers, which map/flatMap over the return value of [[send]]. + */ + def responseMonad: MonadError[R] } diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala index 75e372c..b18fdbb 100644 --- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala +++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala @@ -18,7 +18,7 @@ import okhttp3.{ import okio.{BufferedSink, Okio} import scala.collection.JavaConverters._ -import scala.concurrent.{Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.language.higherKinds abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) @@ -65,7 +65,7 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) private[okhttp] def readResponse[T]( res: OkHttpResponse, - responseAs: ResponseAs[T, S]): Response[T] = { + responseAs: ResponseAs[T, S]): R[Response[T]] = { val body = readResponseBody(res, responseAs) val headers = res @@ -73,43 +73,49 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) .names() .asScala .flatMap(name => res.headers().values(name).asScala.map((name, _))) - Response(body, res.code(), headers.toList) + + responseMonad.map(body, Response(_: T, res.code(), headers.toList)) } private def readResponseBody[T](res: OkHttpResponse, - responseAs: ResponseAs[T, S]): T = { + responseAs: ResponseAs[T, S]): R[T] = { responseAs match { - case IgnoreResponse => res.body().close() + case IgnoreResponse => responseMonad.unit(res.body().close()) case ResponseAsString(encoding) => - res.body().source().readString(Charset.forName(encoding)) - case ResponseAsByteArray => res.body().bytes() - case MappedResponseAs(raw, g) => g(readResponseBody(res, raw)) - case ResponseAsStream() => throw new IllegalStateException() + responseMonad.unit( + res.body().source().readString(Charset.forName(encoding))) + case ResponseAsByteArray => responseMonad.unit(res.body().bytes()) + case MappedResponseAs(raw, g) => + responseMonad.map(readResponseBody(res, raw), g) + case ResponseAsStream() => responseMonad.error(new IllegalStateException("Streaming isn't supported")) } } } -class OkHttpSyncClientHandler(client: OkHttpClient) +class OkHttpSyncClientHandler private (client: OkHttpClient) extends OkHttpClientHandler[Id, Nothing](client) { override def send[T](r: Request[T, Nothing]): Response[T] = { val request = convertRequest(r) val response = client.newCall(request).execute() readResponse(response, r.responseAs) } + + override def responseMonad: MonadError[Id] = IdMonad } object OkHttpSyncClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient()) - : OkHttpSyncClientHandler = + : SttpHandler[Id, Nothing] = new OkHttpSyncClientHandler(okhttpClient) } -class OkHttpFutureClientHandler(client: OkHttpClient) +class OkHttpFutureClientHandler private (client: OkHttpClient)( + implicit ec: ExecutionContext) extends OkHttpClientHandler[Future, Nothing](client) { override def send[T](r: Request[T, Nothing]): Future[Response[T]] = { val request = convertRequest(r) - val promise = Promise[Response[T]]() + val promise = Promise[Future[Response[T]]]() client .newCall(request) @@ -121,12 +127,15 @@ class OkHttpFutureClientHandler(client: OkHttpClient) promise.success(readResponse(response, r.responseAs)) }) - promise.future + promise.future.flatten } + + override def responseMonad: MonadError[Future] = new FutureMonad } object OkHttpFutureClientHandler { - def apply(okhttpClient: OkHttpClient = new OkHttpClient()) - : OkHttpFutureClientHandler = + def apply(okhttpClient: OkHttpClient = new OkHttpClient())( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + : SttpHandler[Future, Nothing] = new OkHttpFutureClientHandler(okhttpClient) } |