aboutsummaryrefslogtreecommitdiff
path: root/akka-http-handler
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-07-24 12:18:27 +0200
committeradamw <adam@warski.org>2017-07-24 12:18:27 +0200
commitccd2c4b1d53bf68e04ff1f8bca032d870494d9a8 (patch)
treee298b14664b07dc9aab54f74abe956fb797fe1bb /akka-http-handler
parentfef16dd2dbd0f53ee7432ab2ff39255279932ac4 (diff)
downloadsttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.gz
sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.tar.bz2
sttp-ccd2c4b1d53bf68e04ff1f8bca032d870494d9a8.zip
Better responseAs mapping, done on the client thread pool
Diffstat (limited to 'akka-http-handler')
-rw-r--r--akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala79
1 files changed, 58 insertions, 21 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
index 716118c..34b7df4 100644
--- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
+++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
@@ -15,25 +15,26 @@ import akka.util.ByteString
import com.softwaremill.sttp._
import com.softwaremill.sttp.model._
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.collection.immutable.Seq
-class AkkaHttpSttpHandler(actorSystem: ActorSystem)
+class AkkaHttpSttpHandler private (actorSystem: ActorSystem,
+ ec: ExecutionContext,
+ terminateActorSystemOnClose: Boolean)
extends SttpHandler[Future, Source[ByteString, Any]] {
// the supported stream type
private type S = Source[ByteString, Any]
- def this() = this(ActorSystem("sttp"))
-
private implicit val as = actorSystem
private implicit val materializer = ActorMaterializer()
- import as.dispatcher
override def send[T](r: Request[T, S]): Future[Response[T]] = {
+ implicit val ec = this.ec
requestToAkka(r)
- .map(setBodyOnAkka(r, r.body, _).get)
+ .flatMap(setBodyOnAkka(r, r.body, _))
+ .toFuture
.flatMap(Http().singleRequest(_))
.flatMap { hr =>
val code = hr.status.intValue()
@@ -57,6 +58,8 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
private def bodyFromAkka[T](rr: ResponseAs[T, S],
hr: HttpResponse): Future[T] = {
+ implicit val ec = this.ec
+
def asByteArray =
hr.entity.dataBytes
.runFold(ByteString(""))(_ ++ _)
@@ -66,21 +69,23 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
asByteArray.map(new String(_, enc))
rr match {
- case IgnoreResponse(g) =>
+ case MappedResponseAs(raw, g) => bodyFromAkka(raw, hr).map(g)
+
+ case IgnoreResponse =>
hr.discardEntityBytes()
- Future.successful(g(()))
+ Future.successful(())
- case ResponseAsString(enc, g) =>
- asString(enc).map(g)
+ case ResponseAsString(enc) =>
+ asString(enc)
- case ResponseAsByteArray(g) =>
- asByteArray.map(g)
+ case ResponseAsByteArray =>
+ asByteArray
- case r @ ResponseAsParams(enc, g) =>
- asString(enc).map(r.parse).map(g)
+ case r @ ResponseAsParams(enc) =>
+ asString(enc).map(r.parse)
- case r @ ResponseAsStream(g) =>
- Future.successful(r.responseIsStream(hr.entity.dataBytes)).map(g)
+ case r @ ResponseAsStream() =>
+ Future.successful(r.responseIsStream(hr.entity.dataBytes))
}
}
@@ -92,7 +97,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
ch :: (cl.toList ++ other)
}
- private def requestToAkka(r: Request[_, S]): Future[HttpRequest] = {
+ private def requestToAkka(r: Request[_, S]): Try[HttpRequest] = {
val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method))
val parsed =
r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2))
@@ -104,9 +109,9 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
case ParsingResult.Ok(h, _) => h
}
- Future.successful(ar.withHeaders(headers.toList))
+ Success(ar.withHeaders(headers.toList))
} else {
- Future.failed(new RuntimeException(s"Cannot parse headers: $errors"))
+ Failure(new RuntimeException(s"Cannot parse headers: $errors"))
}
}
@@ -168,7 +173,39 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
decoder.decodeMessage(response)
}
- def close(): Future[Terminated] = {
- actorSystem.terminate()
+ def close(): Future[Unit] = {
+ implicit val ec = this.ec
+ if (terminateActorSystemOnClose) actorSystem.terminate().map(_ => ())
+ else Future.successful(())
+ }
+
+ private implicit class RichTry[T](t: Try[T]) {
+ def toFuture: Future[T] = t match {
+ case Success(v) => Future.successful(v)
+ case Failure(v) => Future.failed(v)
+ }
}
}
+
+object AkkaHttpSttpHandler {
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def apply()(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global) =
+ new AkkaHttpSttpHandler(ActorSystem("sttp"), ec, true)
+
+ /**
+ * @param actorSystem The actor system which will be used for the http-client
+ * actors.
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
+ def usingActorSystem(actorSystem: ActorSystem)(
+ implicit ec: ExecutionContext = ExecutionContext.Implicits.global) =
+ new AkkaHttpSttpHandler(actorSystem, ec, false)
+}