diff options
author | adamw <adam@warski.org> | 2017-07-08 20:01:53 +0200 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-07-08 20:01:53 +0200 |
commit | f5566b659c4d52de0ff157713fe920671f23a147 (patch) | |
tree | ded526e18d49a0715c3f6efa58204a4c353d30d8 /akka-http-handler/src/main/scala/com/softwaremill | |
parent | a2f3939a89c8b291c4697e822bb931703b3bd3ba (diff) | |
download | sttp-f5566b659c4d52de0ff157713fe920671f23a147.tar.gz sttp-f5566b659c4d52de0ff157713fe920671f23a147.tar.bz2 sttp-f5566b659c4d52de0ff157713fe920671f23a147.zip |
Setting bodies in the Akka impl
Diffstat (limited to 'akka-http-handler/src/main/scala/com/softwaremill')
-rw-r--r-- | akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala | 54 |
1 files changed, 32 insertions, 22 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index 78d6637..df40fc9 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -7,12 +7,13 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.`Content-Type` import akka.http.scaladsl.model.ContentTypes.`application/octet-stream` import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{Source, StreamConverters} import akka.util.ByteString import com.softwaremill.sttp._ import com.softwaremill.sttp.model._ import scala.concurrent.Future +import scala.util.{Failure, Success, Try} class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, Source[ByteString, Any]] { @@ -23,7 +24,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, import as.dispatcher override def send[T](r: Request, responseAs: ResponseAs[T, Source[ByteString, Any]]): Future[Response[T]] = { - requestToAkka(r).flatMap(setBodyOnAkka(r, r.body, _)).flatMap(Http().singleRequest(_)).flatMap { hr => + requestToAkka(r).map(setBodyOnAkka(r, r.body, _).get).flatMap(Http().singleRequest(_)).flatMap { hr => val code = hr.status.intValue() bodyFromAkka(responseAs, hr).map(Response(code, _)) } @@ -65,7 +66,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, private def requestToAkka(r: Request): Future[HttpRequest] = { val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method)) - val parsed = r.headers.map(h => HttpHeader.parse(h._1, h._2)) + val parsed = r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2)) val errors = parsed.collect { case ParsingResult.Error(e) => e } @@ -80,36 +81,45 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, } } - private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Future[HttpRequest] = body match { - case NoBody => Future.successful(ar) - case StringBody(b, encoding) => Future.successful(ar.withEntity(b)) // TODO - case ByteArrayBody(b) => Future.successful(ar.withEntity(b)) - case ByteBufferBody(b) => Future.successful(ar.withEntity(ByteString(b))) - case InputStreamBody(b) => Future.successful(ar) //TODO - case FileBody(b) => Future.successful(ar)//TODO - case PathBody(b) => Future.successful(ar) //TODO - case sb@SerializableBody(_, _) => setSerializableBodyOnAkka(r, sb, ar) - } + private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Try[HttpRequest] = { + getContentTypeOrOctetStream(r).map { ct => + + def doSet(body: RequestBody): HttpRequest = body match { + case NoBody => ar + case StringBody(b, encoding) => + val ctWithEncoding = HttpCharsets.getForKey(encoding).map(hc => ContentType.apply(ct.mediaType, () => hc)).getOrElse(ct) + ar.withEntity(ctWithEncoding, b.getBytes(encoding)) + case ByteArrayBody(b) => ar.withEntity(b) + case ByteBufferBody(b) => ar.withEntity(ByteString(b)) + case InputStreamBody(b) => ar.withEntity(HttpEntity(ct, StreamConverters.fromInputStream(() => b))) + case FileBody(b) => ar.withEntity(ct, b.toPath) + case PathBody(b) => ar.withEntity(ct, b) + case s@SerializableBody(_, _) => doSetSerializable(s) + } - private def setSerializableBodyOnAkka[T](r: Request, body: SerializableBody[T], ar: HttpRequest): Future[HttpRequest] = body match { - case SerializableBody(SourceBodySerializer, t) => - getContentTypeOrOctetStream(r).map(ct => ar.withEntity(HttpEntity(ct, t))) + def doSetSerializable[T](body: SerializableBody[T]): HttpRequest = body match { + case SerializableBody(SourceBodySerializer, t) => ar.withEntity(HttpEntity(ct, t)) + case SerializableBody(f, t) => doSet(f(t)) + } - case SerializableBody(f, t) => setBodyOnAkka(r, f(t), ar) + doSet(body) + } } - private def getContentTypeOrOctetStream(r: Request): Future[ContentType] = { + private def getContentTypeOrOctetStream(r: Request): Try[ContentType] = { r.headers - .find(_._1.toLowerCase.contains(`Content-Type`.lowercaseName)) + .find(isContentType) .map(_._2) .map { ct => ContentType.parse(ct).fold( - errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")), - Future.successful) + errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")), + Success(_)) } - .getOrElse(Future.successful(`application/octet-stream`)) + .getOrElse(Success(`application/octet-stream`)) } + private def isContentType(header: (String, String)) = header._1.toLowerCase.contains(`Content-Type`.lowercaseName) + def close(): Future[Terminated] = { actorSystem.terminate() } |