aboutsummaryrefslogtreecommitdiff
path: root/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala')
-rw-r--r--akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala33
1 files changed, 14 insertions, 19 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
index 866aaad..62e066b 100644
--- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
+++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
@@ -19,22 +19,23 @@ import scala.collection.immutable.Seq
class AkkaHttpSttpHandler(actorSystem: ActorSystem)
extends SttpHandler[Future, Source[ByteString, Any]] {
+ // the supported stream type
+ private type S = Source[ByteString, Any]
+
def this() = this(ActorSystem("sttp"))
private implicit val as = actorSystem
private implicit val materializer = ActorMaterializer()
import as.dispatcher
- override def send[T](r: Request,
- responseAs: ResponseAs[T, Source[ByteString, Any]])
- : Future[Response[T]] = {
+ override def send[T](r: Request[T, S]): Future[Response[T]] = {
requestToAkka(r)
.map(setBodyOnAkka(r, r.body, _).get)
.flatMap(Http().singleRequest(_))
.flatMap { hr =>
val code = hr.status.intValue()
- bodyFromAkka(responseAs, hr).map(
- Response(_, code, headersFromAkka(hr)))
+ bodyFromAkka(r.responseAs, hr)
+ .map(Response(_, code, headersFromAkka(hr)))
}
}
@@ -51,7 +52,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
case _ => HttpMethod.custom(m.m)
}
- private def bodyFromAkka[T](rr: ResponseAs[T, Source[ByteString, Any]],
+ private def bodyFromAkka[T](rr: ResponseAs[T, S],
hr: HttpResponse): Future[T] = {
def asByteArray =
hr.entity.dataBytes
@@ -88,7 +89,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
ch :: (cl.toList ++ other)
}
- private def requestToAkka(r: Request): Future[HttpRequest] = {
+ private def requestToAkka(r: Request[_, S]): Future[HttpRequest] = {
val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method))
val parsed =
r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2))
@@ -106,11 +107,11 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
}
}
- private def setBodyOnAkka(r: Request,
- body: RequestBody,
+ private def setBodyOnAkka(r: Request[_, S],
+ body: RequestBody[S],
ar: HttpRequest): Try[HttpRequest] = {
getContentTypeOrOctetStream(r).map { ct =>
- def doSet(body: RequestBody): HttpRequest = body match {
+ def doSet(body: RequestBody[S]): HttpRequest = body match {
case NoBody => ar
case StringBody(b, encoding) =>
val ctWithEncoding = HttpCharsets
@@ -124,21 +125,15 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem)
ar.withEntity(
HttpEntity(ct, StreamConverters.fromInputStream(() => b)))
case PathBody(b) => ar.withEntity(ct, b)
- case s @ SerializableBody(_, _) => doSetSerializable(s)
+ case StreamBody(s) => ar.withEntity(HttpEntity(ct, s))
+ case SerializableBody(f, t) => doSet(f(t))
}
- def doSetSerializable[T](body: SerializableBody[T]): HttpRequest =
- body match {
- case SerializableBody(SourceBodySerializer, t) =>
- ar.withEntity(HttpEntity(ct, t))
- case SerializableBody(f, t) => doSet(f(t))
- }
-
doSet(body)
}
}
- private def getContentTypeOrOctetStream(r: Request): Try[ContentType] = {
+ private def getContentTypeOrOctetStream(r: Request[_, S]): Try[ContentType] = {
r.headers
.find(isContentType)
.map(_._2)