aboutsummaryrefslogtreecommitdiff
path: root/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala')
-rw-r--r--akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala74
1 files changed, 48 insertions, 26 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
index df40fc9..fc2d632 100644
--- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
+++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
@@ -15,7 +15,8 @@ import com.softwaremill.sttp.model._
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
-class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, Source[ByteString, Any]] {
+class AkkaHttpSttpHandler(actorSystem: ActorSystem)
+ extends SttpHandler[Future, Source[ByteString, Any]] {
def this() = this(ActorSystem("sttp"))
@@ -23,11 +24,16 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
private implicit val materializer = ActorMaterializer()
import as.dispatcher
- override def send[T](r: Request, responseAs: ResponseAs[T, Source[ByteString, Any]]): Future[Response[T]] = {
- requestToAkka(r).map(setBodyOnAkka(r, r.body, _).get).flatMap(Http().singleRequest(_)).flatMap { hr =>
- val code = hr.status.intValue()
- bodyFromAkka(responseAs, hr).map(Response(code, _))
- }
+ override def send[T](r: Request,
+ responseAs: ResponseAs[T, Source[ByteString, Any]])
+ : Future[Response[T]] = {
+ requestToAkka(r)
+ .map(setBodyOnAkka(r, r.body, _).get)
+ .flatMap(Http().singleRequest(_))
+ .flatMap { hr =>
+ val code = hr.status.intValue()
+ bodyFromAkka(responseAs, hr).map(Response(code, _))
+ }
}
private def methodToAkka(m: Method): HttpMethod = m match {
@@ -43,10 +49,12 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
case _ => HttpMethod.custom(m.m)
}
- private def bodyFromAkka[T](rr: ResponseAs[T, Source[ByteString, Any]], hr: HttpResponse): Future[T] = {
- def asByteArray = hr.entity.dataBytes
- .runFold(ByteString(""))(_ ++ _)
- .map(_.toArray[Byte])
+ private def bodyFromAkka[T](rr: ResponseAs[T, Source[ByteString, Any]],
+ hr: HttpResponse): Future[T] = {
+ def asByteArray =
+ hr.entity.dataBytes
+ .runFold(ByteString(""))(_ ++ _)
+ .map(_.toArray[Byte])
rr match {
case IgnoreResponse =>
@@ -59,14 +67,15 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
case ResponseAsByteArray =>
asByteArray
- case r@ResponseAsStream() =>
+ case r @ ResponseAsStream() =>
Future.successful(r.responseIsStream(hr.entity.dataBytes))
}
}
private def requestToAkka(r: Request): Future[HttpRequest] = {
val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method))
- val parsed = r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2))
+ val parsed =
+ r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2))
val errors = parsed.collect {
case ParsingResult.Error(e) => e
}
@@ -81,26 +90,34 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
}
}
- private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Try[HttpRequest] = {
+ private def setBodyOnAkka(r: Request,
+ body: RequestBody,
+ ar: HttpRequest): Try[HttpRequest] = {
getContentTypeOrOctetStream(r).map { ct =>
-
def doSet(body: RequestBody): HttpRequest = body match {
case NoBody => ar
case StringBody(b, encoding) =>
- val ctWithEncoding = HttpCharsets.getForKey(encoding).map(hc => ContentType.apply(ct.mediaType, () => hc)).getOrElse(ct)
+ val ctWithEncoding = HttpCharsets
+ .getForKey(encoding)
+ .map(hc => ContentType.apply(ct.mediaType, () => hc))
+ .getOrElse(ct)
ar.withEntity(ctWithEncoding, b.getBytes(encoding))
case ByteArrayBody(b) => ar.withEntity(b)
case ByteBufferBody(b) => ar.withEntity(ByteString(b))
- case InputStreamBody(b) => ar.withEntity(HttpEntity(ct, StreamConverters.fromInputStream(() => b)))
+ case InputStreamBody(b) =>
+ ar.withEntity(
+ HttpEntity(ct, StreamConverters.fromInputStream(() => b)))
case FileBody(b) => ar.withEntity(ct, b.toPath)
case PathBody(b) => ar.withEntity(ct, b)
- case s@SerializableBody(_, _) => doSetSerializable(s)
+ case s @ SerializableBody(_, _) => doSetSerializable(s)
}
- def doSetSerializable[T](body: SerializableBody[T]): HttpRequest = body match {
- case SerializableBody(SourceBodySerializer, t) => ar.withEntity(HttpEntity(ct, t))
- case SerializableBody(f, t) => doSet(f(t))
- }
+ def doSetSerializable[T](body: SerializableBody[T]): HttpRequest =
+ body match {
+ case SerializableBody(SourceBodySerializer, t) =>
+ ar.withEntity(HttpEntity(ct, t))
+ case SerializableBody(f, t) => doSet(f(t))
+ }
doSet(body)
}
@@ -111,16 +128,21 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
.find(isContentType)
.map(_._2)
.map { ct =>
- ContentType.parse(ct).fold(
- errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")),
- Success(_))
+ ContentType
+ .parse(ct)
+ .fold(
+ errors =>
+ Failure(
+ new RuntimeException(s"Cannot parse content type: $errors")),
+ Success(_))
}
.getOrElse(Success(`application/octet-stream`))
}
- private def isContentType(header: (String, String)) = header._1.toLowerCase.contains(`Content-Type`.lowercaseName)
+ private def isContentType(header: (String, String)) =
+ header._1.toLowerCase.contains(`Content-Type`.lowercaseName)
def close(): Future[Terminated] = {
actorSystem.terminate()
}
-} \ No newline at end of file
+}