aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-08-03 11:38:23 +0200
committeradamw <adam@warski.org>2017-08-03 11:38:23 +0200
commitc07f98349ac5dc646855d00425a9dc2c3324465e (patch)
tree6f07784a68290b406a3b50a59fac36b83c5df355
parent90615deb20ce43f09371b10a25628be8d68485d8 (diff)
downloadsttp-c07f98349ac5dc646855d00425a9dc2c3324465e.tar.gz
sttp-c07f98349ac5dc646855d00425a9dc2c3324465e.tar.bz2
sttp-c07f98349ac5dc646855d00425a9dc2c3324465e.zip
Making the response monad a top-level concept, to make it possible to write SttpHandler wrappers.
-rw-r--r--akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala2
-rw-r--r--async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala33
-rw-r--r--async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala7
-rw-r--r--async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala7
-rw-r--r--async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala20
-rw-r--r--core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala2
-rw-r--r--core/src/main/scala/com/softwaremill/sttp/MonadError.scala44
-rw-r--r--core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala5
-rw-r--r--okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala41
9 files changed, 92 insertions, 69 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
index c096aa4..d4ca3d8 100644
--- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
+++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
@@ -43,6 +43,8 @@ class AkkaHttpSttpHandler private (actorSystem: ActorSystem,
}
}
+ override def responseMonad: MonadError[Future] = new FutureMonad()(ec)
+
private def methodToAkka(m: Method): HttpMethod = m match {
case Method.GET => HttpMethods.GET
case Method.HEAD => HttpMethods.HEAD
diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
index 5d71511..3209e9b 100644
--- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
@@ -2,11 +2,8 @@ package com.softwaremill.sttp.asynchttpclient.future
import java.nio.ByteBuffer
-import com.softwaremill.sttp.SttpHandler
-import com.softwaremill.sttp.asynchttpclient.{
- AsyncHttpClientHandler,
- MonadAsyncError
-}
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
+import com.softwaremill.sttp.{FutureMonad, SttpHandler}
import org.asynchttpclient.{
AsyncHttpClient,
AsyncHttpClientConfig,
@@ -14,13 +11,13 @@ import org.asynchttpclient.{
}
import org.reactivestreams.Publisher
-import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, Future}
class FutureAsyncHttpClientHandler private (
asyncHttpClient: AsyncHttpClient,
closeClient: Boolean)(implicit ec: ExecutionContext)
extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient,
- new FutureMonad(),
+ new FutureMonad,
closeClient) {
override protected def streamBodyToPublisher(
@@ -64,25 +61,3 @@ object FutureAsyncHttpClientHandler {
: SttpHandler[Future, Nothing] =
new FutureAsyncHttpClientHandler(client, closeClient = false)
}
-
-private[future] class FutureMonad(implicit ec: ExecutionContext)
- extends MonadAsyncError[Future] {
- override def unit[T](t: T): Future[T] = Future.successful(t)
-
- override def map[T, T2](fa: Future[T], f: (T) => T2): Future[T2] = fa.map(f)
-
- override def flatMap[T, T2](fa: Future[T], f: (T) => Future[T2]): Future[T2] =
- fa.flatMap(f)
-
- override def async[T](
- register: ((Either[Throwable, T]) => Unit) => Unit): Future[T] = {
- val p = Promise[T]()
- register {
- case Left(t) => p.failure(t)
- case Right(t) => p.success(t)
- }
- p.future
- }
-
- override def error[T](t: Throwable): Future[T] = Future.failed(t)
-}
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
index 2357c06..2407c19 100644
--- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
@@ -2,11 +2,8 @@ package com.softwaremill.sttp.asynchttpclient.monix
import java.nio.ByteBuffer
-import com.softwaremill.sttp.SttpHandler
-import com.softwaremill.sttp.asynchttpclient.{
- AsyncHttpClientHandler,
- MonadAsyncError
-}
+import com.softwaremill.sttp.{MonadAsyncError, SttpHandler}
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
import monix.eval.Task
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.Observable
diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
index ef06a41..01c3cdb 100644
--- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
@@ -2,11 +2,8 @@ package com.softwaremill.sttp.asynchttpclient.scalaz
import java.nio.ByteBuffer
-import com.softwaremill.sttp.SttpHandler
-import com.softwaremill.sttp.asynchttpclient.{
- AsyncHttpClientHandler,
- MonadAsyncError
-}
+import com.softwaremill.sttp.{MonadAsyncError, SttpHandler}
+import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
import org.asynchttpclient.{
AsyncHttpClient,
AsyncHttpClientConfig,
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
index 4824b39..f5b4569 100644
--- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
+++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
@@ -5,10 +5,12 @@ import java.nio.charset.Charset
import com.softwaremill.sttp.model._
import com.softwaremill.sttp.{
+ ContentLengthHeader,
+ MonadAsyncError,
+ MonadError,
Request,
Response,
- SttpHandler,
- ContentLengthHeader
+ SttpHandler
}
import org.asynchttpclient.AsyncHandler.State
import org.asynchttpclient.handler.StreamedAsyncHandler
@@ -50,11 +52,11 @@ abstract class AsyncHttpClientHandler[R[_], S](asyncHttpClient: AsyncHttpClient,
preparedRequest
.execute(eagerAsyncHandler(ra, success, error))
}
-
})
-
}
+ override def responseMonad: MonadError[R] = rm
+
protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer]
protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S
@@ -234,16 +236,6 @@ abstract class AsyncHttpClientHandler[R[_], S](asyncHttpClient: AsyncHttpClient,
}
}
-trait MonadAsyncError[R[_]] {
- def unit[T](t: T): R[T]
- def map[T, T2](fa: R[T], f: T => T2): R[T2]
- def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2]
- def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T]
- def error[T](t: Throwable): R[T]
-
- def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity)
-}
-
object EmptyPublisher extends Publisher[ByteBuffer] {
override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {
s.onComplete()
diff --git a/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala
index 6c3368a..d5c2ccd 100644
--- a/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala
+++ b/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala
@@ -33,6 +33,8 @@ object HttpURLConnectionSttpHandler extends SttpHandler[Id, Nothing] {
}
}
+ override def responseMonad: MonadError[Id] = IdMonad
+
private def setBody(body: RequestBody[Nothing], c: HttpURLConnection): Unit = {
if (body != NoBody) c.setDoOutput(true)
diff --git a/core/src/main/scala/com/softwaremill/sttp/MonadError.scala b/core/src/main/scala/com/softwaremill/sttp/MonadError.scala
new file mode 100644
index 0000000..e5e7ab8
--- /dev/null
+++ b/core/src/main/scala/com/softwaremill/sttp/MonadError.scala
@@ -0,0 +1,44 @@
+package com.softwaremill.sttp
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.language.higherKinds
+
+trait MonadError[R[_]] {
+ def unit[T](t: T): R[T]
+ def map[T, T2](fa: R[T], f: T => T2): R[T2]
+ def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2]
+ def error[T](t: Throwable): R[T]
+
+ def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity)
+}
+
+trait MonadAsyncError[R[_]] extends MonadError[R] {
+ def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T]
+}
+
+object IdMonad extends MonadError[Id] {
+ override def unit[T](t: T): Id[T] = t
+ override def map[T, T2](fa: Id[T], f: (T) => T2): Id[T2] = f(fa)
+ override def flatMap[T, T2](fa: Id[T], f: (T) => Id[T2]): Id[T2] = f(fa)
+ override def error[T](t: Throwable): Id[T] = throw t
+}
+
+class FutureMonad(implicit ec: ExecutionContext)
+ extends MonadAsyncError[Future] {
+
+ override def unit[T](t: T): Future[T] = Future.successful(t)
+ override def map[T, T2](fa: Future[T], f: (T) => T2): Future[T2] = fa.map(f)
+ override def flatMap[T, T2](fa: Future[T], f: (T) => Future[T2]): Future[T2] =
+ fa.flatMap(f)
+ override def error[T](t: Throwable): Future[T] = Future.failed(t)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Future[T] = {
+ val p = Promise[T]()
+ register {
+ case Left(t) => p.failure(t)
+ case Right(t) => p.success(t)
+ }
+ p.future
+ }
+}
diff --git a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala
index 800d66a..bdcc1b5 100644
--- a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala
+++ b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala
@@ -11,4 +11,9 @@ import scala.language.higherKinds
trait SttpHandler[R[_], -S] {
def send[T](request: Request[T, S]): R[Response[T]]
def close(): Unit = {}
+ /**
+ * The monad in which the responses are wrapped. Allows writing wrapper
+ * handlers, which map/flatMap over the return value of [[send]].
+ */
+ def responseMonad: MonadError[R]
}
diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
index 75e372c..b18fdbb 100644
--- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
+++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
@@ -18,7 +18,7 @@ import okhttp3.{
import okio.{BufferedSink, Okio}
import scala.collection.JavaConverters._
-import scala.concurrent.{Future, Promise}
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.higherKinds
abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
@@ -65,7 +65,7 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
private[okhttp] def readResponse[T](
res: OkHttpResponse,
- responseAs: ResponseAs[T, S]): Response[T] = {
+ responseAs: ResponseAs[T, S]): R[Response[T]] = {
val body = readResponseBody(res, responseAs)
val headers = res
@@ -73,43 +73,49 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
.names()
.asScala
.flatMap(name => res.headers().values(name).asScala.map((name, _)))
- Response(body, res.code(), headers.toList)
+
+ responseMonad.map(body, Response(_: T, res.code(), headers.toList))
}
private def readResponseBody[T](res: OkHttpResponse,
- responseAs: ResponseAs[T, S]): T = {
+ responseAs: ResponseAs[T, S]): R[T] = {
responseAs match {
- case IgnoreResponse => res.body().close()
+ case IgnoreResponse => responseMonad.unit(res.body().close())
case ResponseAsString(encoding) =>
- res.body().source().readString(Charset.forName(encoding))
- case ResponseAsByteArray => res.body().bytes()
- case MappedResponseAs(raw, g) => g(readResponseBody(res, raw))
- case ResponseAsStream() => throw new IllegalStateException()
+ responseMonad.unit(
+ res.body().source().readString(Charset.forName(encoding)))
+ case ResponseAsByteArray => responseMonad.unit(res.body().bytes())
+ case MappedResponseAs(raw, g) =>
+ responseMonad.map(readResponseBody(res, raw), g)
+ case ResponseAsStream() => responseMonad.error(new IllegalStateException("Streaming isn't supported"))
}
}
}
-class OkHttpSyncClientHandler(client: OkHttpClient)
+class OkHttpSyncClientHandler private (client: OkHttpClient)
extends OkHttpClientHandler[Id, Nothing](client) {
override def send[T](r: Request[T, Nothing]): Response[T] = {
val request = convertRequest(r)
val response = client.newCall(request).execute()
readResponse(response, r.responseAs)
}
+
+ override def responseMonad: MonadError[Id] = IdMonad
}
object OkHttpSyncClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())
- : OkHttpSyncClientHandler =
+ : SttpHandler[Id, Nothing] =
new OkHttpSyncClientHandler(okhttpClient)
}
-class OkHttpFutureClientHandler(client: OkHttpClient)
+class OkHttpFutureClientHandler private (client: OkHttpClient)(
+ implicit ec: ExecutionContext)
extends OkHttpClientHandler[Future, Nothing](client) {
override def send[T](r: Request[T, Nothing]): Future[Response[T]] = {
val request = convertRequest(r)
- val promise = Promise[Response[T]]()
+ val promise = Promise[Future[Response[T]]]()
client
.newCall(request)
@@ -121,12 +127,15 @@ class OkHttpFutureClientHandler(client: OkHttpClient)
promise.success(readResponse(response, r.responseAs))
})
- promise.future
+ promise.future.flatten
}
+
+ override def responseMonad: MonadError[Future] = new FutureMonad
}
object OkHttpFutureClientHandler {
- def apply(okhttpClient: OkHttpClient = new OkHttpClient())
- : OkHttpFutureClientHandler =
+ def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
+ : SttpHandler[Future, Nothing] =
new OkHttpFutureClientHandler(okhttpClient)
}