diff options
Diffstat (limited to 'akka-http-backend/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpBackend.scala')
-rw-r--r-- | akka-http-backend/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpBackend.scala | 112 |
1 files changed, 39 insertions, 73 deletions
diff --git a/akka-http-backend/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpBackend.scala b/akka-http-backend/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpBackend.scala index 5956793..0dabac7 100644 --- a/akka-http-backend/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpBackend.scala +++ b/akka-http-backend/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpBackend.scala @@ -4,18 +4,13 @@ import java.io.{File, IOException, UnsupportedEncodingException} import akka.actor.ActorSystem import akka.event.LoggingAdapter -import akka.http.scaladsl.{ClientTransport, Http, HttpsConnectionContext} import akka.http.scaladsl.coding.{Deflate, Gzip, NoCoding} import akka.http.scaladsl.model.ContentTypes.`application/octet-stream` import akka.http.scaladsl.model.HttpHeader.ParsingResult -import akka.http.scaladsl.model.headers.{ - HttpEncodings, - `Content-Length`, - `Content-Type` -} +import akka.http.scaladsl.model.headers.{HttpEncodings, `Content-Length`, `Content-Type`} import akka.http.scaladsl.model.{Multipart => AkkaMultipart, _} -import akka.http.scaladsl.settings.ClientConnectionSettings -import akka.http.scaladsl.settings.ConnectionPoolSettings +import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} +import akka.http.scaladsl.{ClientTransport, Http, HttpsConnectionContext} import akka.stream.ActorMaterializer import akka.stream.scaladsl.{FileIO, Source, StreamConverters} import akka.util.ByteString @@ -25,14 +20,13 @@ import scala.collection.immutable.Seq import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} -class AkkaHttpBackend private ( - actorSystem: ActorSystem, - ec: ExecutionContext, - terminateActorSystemOnClose: Boolean, - opts: SttpBackendOptions, - customHttpsContext: Option[HttpsConnectionContext], - customConnectionPoolSettings: Option[ConnectionPoolSettings], - customLog: Option[LoggingAdapter]) +class AkkaHttpBackend private (actorSystem: ActorSystem, + ec: ExecutionContext, + terminateActorSystemOnClose: Boolean, + opts: SttpBackendOptions, + customHttpsContext: Option[HttpsConnectionContext], + customConnectionPoolSettings: Option[ConnectionPoolSettings], + customLog: Option[LoggingAdapter]) extends SttpBackend[Future, Source[ByteString, Any]] { // the supported stream type @@ -47,8 +41,7 @@ class AkkaHttpBackend private ( .withConnectingTimeout(opts.connectionTimeout) private val connectionPoolSettings = { - val base = customConnectionPoolSettings.getOrElse( - ConnectionPoolSettings(actorSystem)) + val base = customConnectionPoolSettings.getOrElse(ConnectionPoolSettings(actorSystem)) opts.proxy match { case None => base case Some(p) => @@ -60,8 +53,7 @@ class AkkaHttpBackend private ( implicit val ec: ExecutionContext = this.ec val settings = connectionPoolSettings - .withConnectionSettings( - connectionSettings.withIdleTimeout(r.options.readTimeout)) + .withConnectionSettings(connectionSettings.withIdleTimeout(r.options.readTimeout)) requestToAkka(r) .flatMap(setBodyOnAkka(r, r.body, _)) @@ -70,8 +62,7 @@ class AkkaHttpBackend private ( req => http.singleRequest(req, settings = settings, - connectionContext = customHttpsContext.getOrElse( - http.defaultClientHttpsContext), + connectionContext = customHttpsContext.getOrElse(http.defaultClientHttpsContext), log = customLog.getOrElse(actorSystem.log))) .flatMap { hr => val code = hr.status.intValue() @@ -110,9 +101,7 @@ class AkkaHttpBackend private ( case _ => HttpMethod.custom(m.m) } - private def bodyFromAkka[T](rr: ResponseAs[T, S], - hr: HttpResponse, - charsetFromHeaders: Option[String]): Future[T] = { + private def bodyFromAkka[T](rr: ResponseAs[T, S], hr: HttpResponse, charsetFromHeaders: Option[String]): Future[T] = { implicit val ec: ExecutionContext = this.ec @@ -126,8 +115,7 @@ class AkkaHttpBackend private ( file.getParentFile.mkdirs() file.createNewFile() } else if (!overwrite) { - throw new IOException( - s"File ${file.getAbsolutePath} exists - overwriting prohibited") + throw new IOException(s"File ${file.getAbsolutePath} exists - overwriting prohibited") } hr.entity.dataBytes.runWith(FileIO.toPath(file.toPath)) @@ -168,8 +156,7 @@ class AkkaHttpBackend private ( headersToAkka(r.headers).map(ar.withHeaders) } - private def headersToAkka( - headers: Seq[(String, String)]): Try[Seq[HttpHeader]] = { + private def headersToAkka(headers: Seq[(String, String)]): Try[Seq[HttpHeader]] = { // content-type and content-length headers have to be set via the body // entity, not as headers val parsed = @@ -200,9 +187,7 @@ class AkkaHttpBackend private ( else Failure[Seq[T]](fs.head.exception) } - private def setBodyOnAkka(r: Request[_, S], - body: RequestBody[S], - ar: HttpRequest): Try[HttpRequest] = { + private def setBodyOnAkka(r: Request[_, S], body: RequestBody[S], ar: HttpRequest): Try[HttpRequest] = { def ctWithEncoding(ct: ContentType, encoding: String) = HttpCharsets .getForKey(encoding) @@ -226,13 +211,9 @@ class AkkaHttpBackend private ( headers <- headersToAkka(mp.additionalHeaders.toList) } yield { val dispositionParams = - mp.fileName.fold(Map.empty[String, String])(fn => - Map("filename" -> fn)) + mp.fileName.fold(Map.empty[String, String])(fn => Map("filename" -> fn)) - AkkaMultipart.FormData.BodyPart(mp.name, - entity(ct), - dispositionParams, - headers) + AkkaMultipart.FormData.BodyPart(mp.name, entity(ct), dispositionParams, headers) } } @@ -240,44 +221,34 @@ class AkkaHttpBackend private ( body match { case NoBody => Success(ar) case StringBody(b, encoding, _) => - Success( - ar.withEntity(ctWithEncoding(ct, encoding), b.getBytes(encoding))) + Success(ar.withEntity(ctWithEncoding(ct, encoding), b.getBytes(encoding))) case ByteArrayBody(b, _) => Success(ar.withEntity(HttpEntity(ct, b))) case ByteBufferBody(b, _) => Success(ar.withEntity(HttpEntity(ct, ByteString(b)))) case InputStreamBody(b, _) => - Success( - ar.withEntity( - HttpEntity(ct, StreamConverters.fromInputStream(() => b)))) + Success(ar.withEntity(HttpEntity(ct, StreamConverters.fromInputStream(() => b)))) case PathBody(b, _) => Success(ar.withEntity(ct, b)) case StreamBody(s) => Success(ar.withEntity(HttpEntity(ct, s))) case MultipartBody(ps) => traverseTry(ps.map(toBodyPart)) - .map(bodyParts => - ar.withEntity(AkkaMultipart.FormData(bodyParts: _*).toEntity())) + .map(bodyParts => ar.withEntity(AkkaMultipart.FormData(bodyParts: _*).toEntity())) } } } - private def parseContentTypeOrOctetStream( - r: Request[_, S]): Try[ContentType] = { + private def parseContentTypeOrOctetStream(r: Request[_, S]): Try[ContentType] = { parseContentTypeOrOctetStream( r.headers .find(isContentType) .map(_._2)) } - private def parseContentTypeOrOctetStream( - ctHeader: Option[String]): Try[ContentType] = { + private def parseContentTypeOrOctetStream(ctHeader: Option[String]): Try[ContentType] = { ctHeader .map { ct => ContentType .parse(ct) - .fold( - errors => - Failure( - new RuntimeException(s"Cannot parse content type: $errors")), - Success(_)) + .fold(errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")), Success(_)) } .getOrElse(Success(`application/octet-stream`)) } @@ -314,15 +285,13 @@ class AkkaHttpBackend private ( } object AkkaHttpBackend { - private def apply( - actorSystem: ActorSystem, - ec: ExecutionContext, - terminateActorSystemOnClose: Boolean, - options: SttpBackendOptions, - customHttpsContext: Option[HttpsConnectionContext], - customConnectionPoolSettings: Option[ConnectionPoolSettings], - customLog: Option[LoggingAdapter]) - : SttpBackend[Future, Source[ByteString, Any]] = + private def apply(actorSystem: ActorSystem, + ec: ExecutionContext, + terminateActorSystemOnClose: Boolean, + options: SttpBackendOptions, + customHttpsContext: Option[HttpsConnectionContext], + customConnectionPoolSettings: Option[ConnectionPoolSettings], + customLog: Option[LoggingAdapter]): SttpBackend[Future, Source[ByteString, Any]] = new FollowRedirectsBackend( new AkkaHttpBackend(actorSystem, ec, @@ -341,8 +310,7 @@ object AkkaHttpBackend { customHttpsContext: Option[HttpsConnectionContext] = None, customConnectionPoolSettings: Option[ConnectionPoolSettings] = None, customLog: Option[LoggingAdapter] = None)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpBackend[Future, Source[ByteString, Any]] = + implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend(ActorSystem("sttp"), ec, terminateActorSystemOnClose = true, @@ -358,14 +326,12 @@ object AkkaHttpBackend { * e.g. mapping responses. Defaults to the global execution * context. */ - def usingActorSystem( - actorSystem: ActorSystem, - options: SttpBackendOptions = SttpBackendOptions.Default, - customHttpsContext: Option[HttpsConnectionContext] = None, - customConnectionPoolSettings: Option[ConnectionPoolSettings] = None, - customLog: Option[LoggingAdapter] = None)( - implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpBackend[Future, Source[ByteString, Any]] = + def usingActorSystem(actorSystem: ActorSystem, + options: SttpBackendOptions = SttpBackendOptions.Default, + customHttpsContext: Option[HttpsConnectionContext] = None, + customConnectionPoolSettings: Option[ConnectionPoolSettings] = None, + customLog: Option[LoggingAdapter] = None)( + implicit ec: ExecutionContext = ExecutionContext.Implicits.global): SttpBackend[Future, Source[ByteString, Any]] = AkkaHttpBackend(actorSystem, ec, terminateActorSystemOnClose = false, |