diff options
11 files changed, 170 insertions, 105 deletions
@@ -203,10 +203,10 @@ in a `Future`. Next you'll need to add an implicit value: ```scala -implicit val sttpHandler = new AkkaHttpSttpHandler() +implicit val sttpHandler = AkkaHttpSttpHandler() // or, if you'd like to use an existing actor system: -implicit val sttpHandler = new AkkaHttpSttpHandler(actorSystem) +implicit val sttpHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem) ``` This backend supports sending and receiving @@ -238,7 +238,7 @@ import com.softwaremill.sttp.akkahttp._ import akka.stream.scaladsl.Source import akka.util.ByteString -implicit val sttpHandler = new AkkaHttpSttpHandler(actorSystem) +implicit val sttpHandler = AkkaHttpSttpHandler() val response: Future[Response[Source[ByteString, Any]]] = sttp diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index 716118c..34b7df4 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -15,25 +15,26 @@ import akka.util.ByteString import com.softwaremill.sttp._ import com.softwaremill.sttp.model._ -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} import scala.collection.immutable.Seq -class AkkaHttpSttpHandler(actorSystem: ActorSystem) +class AkkaHttpSttpHandler private (actorSystem: ActorSystem, + ec: ExecutionContext, + terminateActorSystemOnClose: Boolean) extends SttpHandler[Future, Source[ByteString, Any]] { // the supported stream type private type S = Source[ByteString, Any] - def this() = this(ActorSystem("sttp")) - private implicit val as = actorSystem private implicit val materializer = ActorMaterializer() - import as.dispatcher override def send[T](r: Request[T, S]): Future[Response[T]] = { + implicit val ec = this.ec requestToAkka(r) - .map(setBodyOnAkka(r, r.body, _).get) + .flatMap(setBodyOnAkka(r, r.body, _)) + .toFuture .flatMap(Http().singleRequest(_)) .flatMap { hr => val code = hr.status.intValue() @@ -57,6 +58,8 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) private def bodyFromAkka[T](rr: ResponseAs[T, S], hr: HttpResponse): Future[T] = { + implicit val ec = this.ec + def asByteArray = hr.entity.dataBytes .runFold(ByteString(""))(_ ++ _) @@ -66,21 +69,23 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) asByteArray.map(new String(_, enc)) rr match { - case IgnoreResponse(g) => + case MappedResponseAs(raw, g) => bodyFromAkka(raw, hr).map(g) + + case IgnoreResponse => hr.discardEntityBytes() - Future.successful(g(())) + Future.successful(()) - case ResponseAsString(enc, g) => - asString(enc).map(g) + case ResponseAsString(enc) => + asString(enc) - case ResponseAsByteArray(g) => - asByteArray.map(g) + case ResponseAsByteArray => + asByteArray - case r @ ResponseAsParams(enc, g) => - asString(enc).map(r.parse).map(g) + case r @ ResponseAsParams(enc) => + asString(enc).map(r.parse) - case r @ ResponseAsStream(g) => - Future.successful(r.responseIsStream(hr.entity.dataBytes)).map(g) + case r @ ResponseAsStream() => + Future.successful(r.responseIsStream(hr.entity.dataBytes)) } } @@ -92,7 +97,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) ch :: (cl.toList ++ other) } - private def requestToAkka(r: Request[_, S]): Future[HttpRequest] = { + private def requestToAkka(r: Request[_, S]): Try[HttpRequest] = { val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method)) val parsed = r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2)) @@ -104,9 +109,9 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) case ParsingResult.Ok(h, _) => h } - Future.successful(ar.withHeaders(headers.toList)) + Success(ar.withHeaders(headers.toList)) } else { - Future.failed(new RuntimeException(s"Cannot parse headers: $errors")) + Failure(new RuntimeException(s"Cannot parse headers: $errors")) } } @@ -168,7 +173,39 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) decoder.decodeMessage(response) } - def close(): Future[Terminated] = { - actorSystem.terminate() + def close(): Future[Unit] = { + implicit val ec = this.ec + if (terminateActorSystemOnClose) actorSystem.terminate().map(_ => ()) + else Future.successful(()) + } + + private implicit class RichTry[T](t: Try[T]) { + def toFuture: Future[T] = t match { + case Success(v) => Future.successful(v) + case Failure(v) => Future.failed(v) + } } } + +object AkkaHttpSttpHandler { + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def apply()( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) = + new AkkaHttpSttpHandler(ActorSystem("sttp"), ec, true) + + /** + * @param actorSystem The actor system which will be used for the http-client + * actors. + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingActorSystem(actorSystem: ActorSystem)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) = + new AkkaHttpSttpHandler(actorSystem, ec, false) +} diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala index 59119d7..41fcf68 100644 --- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala +++ b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala @@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.future import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, - WrapperFromAsync + MonadAsyncError } import org.asynchttpclient.{ AsyncHttpClient, @@ -10,18 +10,27 @@ import org.asynchttpclient.{ DefaultAsyncHttpClient } -import scala.concurrent.{Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} -class FutureAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Future](asyncHttpClient, FutureFromAsync) { +class FutureAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) + extends AsyncHttpClientHandler[Future](asyncHttpClient, new FutureMonad()) { def this() = this(new DefaultAsyncHttpClient()) def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) } -private[asynchttpclient] object FutureFromAsync - extends WrapperFromAsync[Future] { - override def apply[T]( +private[future] class FutureMonad(implicit ec: ExecutionContext) + extends MonadAsyncError[Future] { + override def unit[T](t: T): Future[T] = Future.successful(t) + + override def map[T, T2](fa: Future[T], f: (T) => T2): Future[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Future[T], + f: (T) => Future[T2]): Future[T2] = + fa.flatMap(f) + + override def async[T]( register: ((Either[Throwable, T]) => Unit) => Unit): Future[T] = { val p = Promise[T]() register { @@ -30,4 +39,6 @@ private[asynchttpclient] object FutureFromAsync } p.future } + + override def error[T](t: Throwable): Future[T] = Future.failed(t) } diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala index de0c139..30106f2 100644 --- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala +++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala @@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.monix import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, - WrapperFromAsync + MonadAsyncError } import monix.eval.Task import monix.execution.Cancelable @@ -15,14 +15,21 @@ import org.asynchttpclient.{ import scala.util.{Failure, Success} class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskFromAsync) { + extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) { def this() = this(new DefaultAsyncHttpClient()) def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) } -private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { - override def apply[T]( +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = Task.async { (_, cb) => register { @@ -32,4 +39,6 @@ private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { Cancelable.empty } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) } diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala index ab2e261..57d65c6 100644 --- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala +++ b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala @@ -2,7 +2,7 @@ package com.softwaremill.sttp.asynchttpclient.scalaz import com.softwaremill.sttp.asynchttpclient.{ AsyncHttpClientHandler, - WrapperFromAsync + MonadAsyncError } import org.asynchttpclient.{ AsyncHttpClient, @@ -14,14 +14,21 @@ import scalaz.{-\/, \/-} import scalaz.concurrent.Task class ScalazAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient) - extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskFromAsync) { + extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) { def this() = this(new DefaultAsyncHttpClient()) def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg)) } -private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { - override def apply[T]( +private[scalaz] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.point(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = Task.async { cb => register { @@ -29,4 +36,6 @@ private[asynchttpclient] object TaskFromAsync extends WrapperFromAsync[Task] { case Right(t) => cb(\/-(t)) } } + + override def error[T](t: Throwable): Task[T] = Task.fail(t) } diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala index 98160c5..1683c3d 100644 --- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala +++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala @@ -16,11 +16,11 @@ import scala.collection.JavaConverters._ import scala.language.higherKinds class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, - wrapper: WrapperFromAsync[R]) + rm: MonadAsyncError[R]) extends SttpHandler[R, Nothing] { override def send[T](r: Request[T, Nothing]): R[Response[T]] = { - wrapper { cb => + rm.flatten(rm.async[R[Response[T]]] { cb => asyncHttpClient .prepareRequest(requestToAsync(r)) .execute(new AsyncCompletionHandler[AsyncResponse] { @@ -30,7 +30,7 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, } override def onThrowable(t: Throwable): Unit = cb(Left(t)) }) - } + }) } private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = { @@ -70,44 +70,55 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient, private def readResponse[T]( response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): Response[T] = { - Response(readResponseBody(response, responseAs), - response.getStatusCode, - response.getHeaders - .iterator() - .asScala - .map(e => (e.getKey, e.getValue)) - .toList) + responseAs: ResponseAs[T, Nothing]): R[Response[T]] = { + val body = readResponseBody(response, responseAs) + rm.map(body, + Response(_: T, + response.getStatusCode, + response.getHeaders + .iterator() + .asScala + .map(e => (e.getKey, e.getValue)) + .toList)) } private def readResponseBody[T](response: AsyncResponse, - responseAs: ResponseAs[T, Nothing]): T = { + responseAs: ResponseAs[T, Nothing]): R[T] = { def asString(enc: String) = response.getResponseBody(Charset.forName(enc)) responseAs match { - case IgnoreResponse(g) => + case MappedResponseAs(raw, g) => + rm.map(readResponseBody(response, raw), g) + + case IgnoreResponse => // getting the body and discarding it response.getResponseBodyAsBytes - g(()) + rm.unit(()) - case ResponseAsString(enc, g) => - g(asString(enc)) + case ResponseAsString(enc) => + rm.unit(asString(enc)) - case ResponseAsByteArray(g) => - g(response.getResponseBodyAsBytes) + case ResponseAsByteArray => + rm.unit(response.getResponseBodyAsBytes) - case r @ ResponseAsParams(enc, g) => - g(r.parse(asString(enc))) + case r @ ResponseAsParams(enc) => + rm.unit(r.parse(asString(enc))) - case ResponseAsStream(_) => + case ResponseAsStream() => // only possible when the user requests the response as a stream of // Nothing. Oh well ... - throw new IllegalStateException() + rm.error(new IllegalStateException()) } } } -trait WrapperFromAsync[R[_]] { - def apply[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] +trait MonadAsyncError[R[_]] { + def unit[T](t: T): R[T] + def map[T, T2](fa: R[T], f: T => T2): R[T2] + def flatMap[T, T2](fa: R[T], f: T => R[T2]): R[T2] + def async[T](register: (Either[Throwable, T] => Unit) => Unit): R[T] + def error[T](t: Throwable): R[T] + + def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity) } diff --git a/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala index f5a5971..1ef7fc2 100644 --- a/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala +++ b/core/src/main/scala/com/softwaremill/sttp/HttpURLConnectionSttpHandler.scala @@ -101,15 +101,16 @@ object HttpURLConnectionSttpHandler extends SttpHandler[Id, Nothing] { def asString(enc: String) = Source.fromInputStream(is, enc).mkString responseAs match { - case IgnoreResponse(g) => - @tailrec def consume(): Unit = if (is.read() != -1) consume() + case MappedResponseAs(raw, g) => g(readResponseBody(is, raw)) - g(consume()) + case IgnoreResponse => + @tailrec def consume(): Unit = if (is.read() != -1) consume() + consume() - case ResponseAsString(enc, g) => - g(asString(enc)) + case ResponseAsString(enc) => + asString(enc) - case ResponseAsByteArray(g) => + case ResponseAsByteArray => val os = new ByteArrayOutputStream var read = 0 val buf = new Array[Byte](1024) @@ -125,12 +126,12 @@ object HttpURLConnectionSttpHandler extends SttpHandler[Id, Nothing] { transfer() - g(os.toByteArray) + os.toByteArray - case r @ ResponseAsParams(enc, g) => - g(r.parse(asString(enc))) + case r @ ResponseAsParams(enc) => + r.parse(asString(enc)) - case ResponseAsStream(_) => + case ResponseAsStream() => // only possible when the user requests the response as a stream of // Nothing. Oh well ... throw new IllegalStateException() diff --git a/core/src/main/scala/com/softwaremill/sttp/model/package.scala b/core/src/main/scala/com/softwaremill/sttp/model/package.scala index def6132..e0b4048 100644 --- a/core/src/main/scala/com/softwaremill/sttp/model/package.scala +++ b/core/src/main/scala/com/softwaremill/sttp/model/package.scala @@ -47,29 +47,16 @@ package object model { * @tparam S If `T` is a stream, the type of the stream. Otherwise, `Nothing`. */ sealed trait ResponseAs[T, +S] { - def map[T2](f: T => T2): ResponseAs[T2, S] + def map[T2](f: T => T2): ResponseAs[T2, S] = + MappedResponseAs[T, T2, S](this, f) } - case class IgnoreResponse[T](g: Unit => T) extends ResponseAs[T, Nothing] { - override def map[T2](f: T => T2): ResponseAs[T2, Nothing] = - IgnoreResponse(g andThen f) - } - case class ResponseAsString[T](encoding: String, g: String => T) - extends ResponseAs[T, Nothing] { - override def map[T2](f: T => T2): ResponseAs[T2, Nothing] = - ResponseAsString(encoding, g andThen f) - } - case class ResponseAsByteArray[T](g: Array[Byte] => T) - extends ResponseAs[T, Nothing] { - override def map[T2](f: T => T2): ResponseAs[T2, Nothing] = - ResponseAsByteArray(g andThen f) - } - case class ResponseAsParams[T](encoding: String, - g: Seq[(String, String)] => T) - extends ResponseAs[T, Nothing] { - - override def map[T2](f: T => T2): ResponseAs[T2, Nothing] = - ResponseAsParams(encoding, g andThen f) + case object IgnoreResponse extends ResponseAs[Unit, Nothing] + case class ResponseAsString(encoding: String) + extends ResponseAs[String, Nothing] + case object ResponseAsByteArray extends ResponseAs[Array[Byte], Nothing] + case class ResponseAsParams(encoding: String) + extends ResponseAs[Seq[(String, String)], Nothing] { private[sttp] def parse(s: String): Seq[(String, String)] = { s.split("&") @@ -84,11 +71,11 @@ package object model { }) } } - case class ResponseAsStream[T, T2, S](g: T => T2)( - implicit val responseIsStream: S =:= T) + case class ResponseAsStream[T, S]()(implicit val responseIsStream: S =:= T) + extends ResponseAs[T, S] + case class MappedResponseAs[T, T2, S](raw: ResponseAs[T, S], g: T => T2) extends ResponseAs[T2, S] { - override def map[T3](f: T2 => T3): ResponseAs[T3, S] = - ResponseAsStream(g andThen f) + MappedResponseAs[T, T3, S](raw, g andThen f) } } diff --git a/core/src/main/scala/com/softwaremill/sttp/package.scala b/core/src/main/scala/com/softwaremill/sttp/package.scala index aa2224a..f9ee9c6 100644 --- a/core/src/main/scala/com/softwaremill/sttp/package.scala +++ b/core/src/main/scala/com/softwaremill/sttp/package.scala @@ -16,16 +16,16 @@ package object sttp { type Id[X] = X type Empty[X] = None.type - def ignore: ResponseAs[Unit, Nothing] = IgnoreResponse(identity) + def ignore: ResponseAs[Unit, Nothing] = IgnoreResponse /** * Uses `utf-8` encoding. */ def asString: ResponseAs[String, Nothing] = asString(Utf8) def asString(encoding: String): ResponseAs[String, Nothing] = - ResponseAsString(encoding, identity) + ResponseAsString(encoding) def asByteArray: ResponseAs[Array[Byte], Nothing] = - ResponseAsByteArray(identity) + ResponseAsByteArray /** * Uses `utf-8` encoding. @@ -33,9 +33,9 @@ package object sttp { def asParams: ResponseAs[Seq[(String, String)], Nothing] = asParams(Utf8) def asParams(encoding: String): ResponseAs[Seq[(String, String)], Nothing] = - ResponseAsParams(encoding, identity) + ResponseAsParams(encoding) - def asStream[S]: ResponseAs[S, S] = ResponseAsStream[S, S, S](identity) + def asStream[S]: ResponseAs[S, S] = ResponseAsStream[S, S]() /** * Use the factory methods `multiPart` to conveniently create instances of diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala index 030f90c..4e9c80e 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala @@ -117,7 +117,7 @@ class BasicTests runTests("HttpURLConnection")(HttpURLConnectionSttpHandler, ForceWrappedValue.id) - runTests("Akka HTTP")(new AkkaHttpSttpHandler(actorSystem), + runTests("Akka HTTP")(AkkaHttpSttpHandler.usingActorSystem(actorSystem), ForceWrappedValue.future) runTests("Async Http Client - Future")(new FutureAsyncHttpClientHandler(), ForceWrappedValue.future) diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala index 95a8bed..a67be2e 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala @@ -34,7 +34,7 @@ class StreamingTests akkaStreamingTests() def akkaStreamingTests(): Unit = { - implicit val handler = new AkkaHttpSttpHandler(actorSystem) + implicit val handler = AkkaHttpSttpHandler.usingActorSystem(actorSystem) val body = "streaming test" |