aboutsummaryrefslogtreecommitdiff
path: root/akka-http-handler
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-07-08 20:01:53 +0200
committeradamw <adam@warski.org>2017-07-08 20:01:53 +0200
commitf5566b659c4d52de0ff157713fe920671f23a147 (patch)
treeded526e18d49a0715c3f6efa58204a4c353d30d8 /akka-http-handler
parenta2f3939a89c8b291c4697e822bb931703b3bd3ba (diff)
downloadsttp-f5566b659c4d52de0ff157713fe920671f23a147.tar.gz
sttp-f5566b659c4d52de0ff157713fe920671f23a147.tar.bz2
sttp-f5566b659c4d52de0ff157713fe920671f23a147.zip
Setting bodies in the Akka impl
Diffstat (limited to 'akka-http-handler')
-rw-r--r--akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala54
1 files changed, 32 insertions, 22 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
index 78d6637..df40fc9 100644
--- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
+++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
@@ -7,12 +7,13 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.`Content-Type`
import akka.http.scaladsl.model.ContentTypes.`application/octet-stream`
import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Source
+import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
import com.softwaremill.sttp._
import com.softwaremill.sttp.model._
import scala.concurrent.Future
+import scala.util.{Failure, Success, Try}
class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, Source[ByteString, Any]] {
@@ -23,7 +24,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
import as.dispatcher
override def send[T](r: Request, responseAs: ResponseAs[T, Source[ByteString, Any]]): Future[Response[T]] = {
- requestToAkka(r).flatMap(setBodyOnAkka(r, r.body, _)).flatMap(Http().singleRequest(_)).flatMap { hr =>
+ requestToAkka(r).map(setBodyOnAkka(r, r.body, _).get).flatMap(Http().singleRequest(_)).flatMap { hr =>
val code = hr.status.intValue()
bodyFromAkka(responseAs, hr).map(Response(code, _))
}
@@ -65,7 +66,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
private def requestToAkka(r: Request): Future[HttpRequest] = {
val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method))
- val parsed = r.headers.map(h => HttpHeader.parse(h._1, h._2))
+ val parsed = r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2))
val errors = parsed.collect {
case ParsingResult.Error(e) => e
}
@@ -80,36 +81,45 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future,
}
}
- private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Future[HttpRequest] = body match {
- case NoBody => Future.successful(ar)
- case StringBody(b, encoding) => Future.successful(ar.withEntity(b)) // TODO
- case ByteArrayBody(b) => Future.successful(ar.withEntity(b))
- case ByteBufferBody(b) => Future.successful(ar.withEntity(ByteString(b)))
- case InputStreamBody(b) => Future.successful(ar) //TODO
- case FileBody(b) => Future.successful(ar)//TODO
- case PathBody(b) => Future.successful(ar) //TODO
- case sb@SerializableBody(_, _) => setSerializableBodyOnAkka(r, sb, ar)
- }
+ private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Try[HttpRequest] = {
+ getContentTypeOrOctetStream(r).map { ct =>
+
+ def doSet(body: RequestBody): HttpRequest = body match {
+ case NoBody => ar
+ case StringBody(b, encoding) =>
+ val ctWithEncoding = HttpCharsets.getForKey(encoding).map(hc => ContentType.apply(ct.mediaType, () => hc)).getOrElse(ct)
+ ar.withEntity(ctWithEncoding, b.getBytes(encoding))
+ case ByteArrayBody(b) => ar.withEntity(b)
+ case ByteBufferBody(b) => ar.withEntity(ByteString(b))
+ case InputStreamBody(b) => ar.withEntity(HttpEntity(ct, StreamConverters.fromInputStream(() => b)))
+ case FileBody(b) => ar.withEntity(ct, b.toPath)
+ case PathBody(b) => ar.withEntity(ct, b)
+ case s@SerializableBody(_, _) => doSetSerializable(s)
+ }
- private def setSerializableBodyOnAkka[T](r: Request, body: SerializableBody[T], ar: HttpRequest): Future[HttpRequest] = body match {
- case SerializableBody(SourceBodySerializer, t) =>
- getContentTypeOrOctetStream(r).map(ct => ar.withEntity(HttpEntity(ct, t)))
+ def doSetSerializable[T](body: SerializableBody[T]): HttpRequest = body match {
+ case SerializableBody(SourceBodySerializer, t) => ar.withEntity(HttpEntity(ct, t))
+ case SerializableBody(f, t) => doSet(f(t))
+ }
- case SerializableBody(f, t) => setBodyOnAkka(r, f(t), ar)
+ doSet(body)
+ }
}
- private def getContentTypeOrOctetStream(r: Request): Future[ContentType] = {
+ private def getContentTypeOrOctetStream(r: Request): Try[ContentType] = {
r.headers
- .find(_._1.toLowerCase.contains(`Content-Type`.lowercaseName))
+ .find(isContentType)
.map(_._2)
.map { ct =>
ContentType.parse(ct).fold(
- errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")),
- Future.successful)
+ errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")),
+ Success(_))
}
- .getOrElse(Future.successful(`application/octet-stream`))
+ .getOrElse(Success(`application/octet-stream`))
}
+ private def isContentType(header: (String, String)) = header._1.toLowerCase.contains(`Content-Type`.lowercaseName)
+
def close(): Future[Terminated] = {
actorSystem.terminate()
}