diff options
Diffstat (limited to 'akka-http-handler')
-rw-r--r-- | akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala | 79 |
1 files changed, 58 insertions, 21 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index 716118c..34b7df4 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -15,25 +15,26 @@ import akka.util.ByteString import com.softwaremill.sttp._ import com.softwaremill.sttp.model._ -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} import scala.collection.immutable.Seq -class AkkaHttpSttpHandler(actorSystem: ActorSystem) +class AkkaHttpSttpHandler private (actorSystem: ActorSystem, + ec: ExecutionContext, + terminateActorSystemOnClose: Boolean) extends SttpHandler[Future, Source[ByteString, Any]] { // the supported stream type private type S = Source[ByteString, Any] - def this() = this(ActorSystem("sttp")) - private implicit val as = actorSystem private implicit val materializer = ActorMaterializer() - import as.dispatcher override def send[T](r: Request[T, S]): Future[Response[T]] = { + implicit val ec = this.ec requestToAkka(r) - .map(setBodyOnAkka(r, r.body, _).get) + .flatMap(setBodyOnAkka(r, r.body, _)) + .toFuture .flatMap(Http().singleRequest(_)) .flatMap { hr => val code = hr.status.intValue() @@ -57,6 +58,8 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) private def bodyFromAkka[T](rr: ResponseAs[T, S], hr: HttpResponse): Future[T] = { + implicit val ec = this.ec + def asByteArray = hr.entity.dataBytes .runFold(ByteString(""))(_ ++ _) @@ -66,21 +69,23 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) asByteArray.map(new String(_, enc)) rr match { - case IgnoreResponse(g) => + case MappedResponseAs(raw, g) => bodyFromAkka(raw, hr).map(g) + + case IgnoreResponse => hr.discardEntityBytes() - Future.successful(g(())) + Future.successful(()) - case ResponseAsString(enc, g) => - asString(enc).map(g) + case ResponseAsString(enc) => + asString(enc) - case ResponseAsByteArray(g) => - asByteArray.map(g) + case ResponseAsByteArray => + asByteArray - case r @ ResponseAsParams(enc, g) => - asString(enc).map(r.parse).map(g) + case r @ ResponseAsParams(enc) => + asString(enc).map(r.parse) - case r @ ResponseAsStream(g) => - Future.successful(r.responseIsStream(hr.entity.dataBytes)).map(g) + case r @ ResponseAsStream() => + Future.successful(r.responseIsStream(hr.entity.dataBytes)) } } @@ -92,7 +97,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) ch :: (cl.toList ++ other) } - private def requestToAkka(r: Request[_, S]): Future[HttpRequest] = { + private def requestToAkka(r: Request[_, S]): Try[HttpRequest] = { val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method)) val parsed = r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2)) @@ -104,9 +109,9 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) case ParsingResult.Ok(h, _) => h } - Future.successful(ar.withHeaders(headers.toList)) + Success(ar.withHeaders(headers.toList)) } else { - Future.failed(new RuntimeException(s"Cannot parse headers: $errors")) + Failure(new RuntimeException(s"Cannot parse headers: $errors")) } } @@ -168,7 +173,39 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) decoder.decodeMessage(response) } - def close(): Future[Terminated] = { - actorSystem.terminate() + def close(): Future[Unit] = { + implicit val ec = this.ec + if (terminateActorSystemOnClose) actorSystem.terminate().map(_ => ()) + else Future.successful(()) + } + + private implicit class RichTry[T](t: Try[T]) { + def toFuture: Future[T] = t match { + case Success(v) => Future.successful(v) + case Failure(v) => Future.failed(v) + } } } + +object AkkaHttpSttpHandler { + + /** + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def apply()( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) = + new AkkaHttpSttpHandler(ActorSystem("sttp"), ec, true) + + /** + * @param actorSystem The actor system which will be used for the http-client + * actors. + * @param ec The execution context for running non-network related operations, + * e.g. mapping responses. Defaults to the global execution + * context. + */ + def usingActorSystem(actorSystem: ActorSystem)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global) = + new AkkaHttpSttpHandler(actorSystem, ec, false) +} |