From f5566b659c4d52de0ff157713fe920671f23a147 Mon Sep 17 00:00:00 2001 From: adamw Date: Sat, 8 Jul 2017 20:01:53 +0200 Subject: Setting bodies in the Akka impl --- .../sttp/akkahttp/AkkaHttpSttpHandler.scala | 54 +++++++++++++--------- 1 file changed, 32 insertions(+), 22 deletions(-) (limited to 'akka-http-handler') diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index 78d6637..df40fc9 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -7,12 +7,13 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.`Content-Type` import akka.http.scaladsl.model.ContentTypes.`application/octet-stream` import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{Source, StreamConverters} import akka.util.ByteString import com.softwaremill.sttp._ import com.softwaremill.sttp.model._ import scala.concurrent.Future +import scala.util.{Failure, Success, Try} class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, Source[ByteString, Any]] { @@ -23,7 +24,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, import as.dispatcher override def send[T](r: Request, responseAs: ResponseAs[T, Source[ByteString, Any]]): Future[Response[T]] = { - requestToAkka(r).flatMap(setBodyOnAkka(r, r.body, _)).flatMap(Http().singleRequest(_)).flatMap { hr => + requestToAkka(r).map(setBodyOnAkka(r, r.body, _).get).flatMap(Http().singleRequest(_)).flatMap { hr => val code = hr.status.intValue() bodyFromAkka(responseAs, hr).map(Response(code, _)) } @@ -65,7 +66,7 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, private def requestToAkka(r: Request): Future[HttpRequest] = { val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method)) - val parsed = r.headers.map(h => HttpHeader.parse(h._1, h._2)) + val parsed = r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2)) val errors = parsed.collect { case ParsingResult.Error(e) => e } @@ -80,36 +81,45 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, } } - private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Future[HttpRequest] = body match { - case NoBody => Future.successful(ar) - case StringBody(b, encoding) => Future.successful(ar.withEntity(b)) // TODO - case ByteArrayBody(b) => Future.successful(ar.withEntity(b)) - case ByteBufferBody(b) => Future.successful(ar.withEntity(ByteString(b))) - case InputStreamBody(b) => Future.successful(ar) //TODO - case FileBody(b) => Future.successful(ar)//TODO - case PathBody(b) => Future.successful(ar) //TODO - case sb@SerializableBody(_, _) => setSerializableBodyOnAkka(r, sb, ar) - } + private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Try[HttpRequest] = { + getContentTypeOrOctetStream(r).map { ct => + + def doSet(body: RequestBody): HttpRequest = body match { + case NoBody => ar + case StringBody(b, encoding) => + val ctWithEncoding = HttpCharsets.getForKey(encoding).map(hc => ContentType.apply(ct.mediaType, () => hc)).getOrElse(ct) + ar.withEntity(ctWithEncoding, b.getBytes(encoding)) + case ByteArrayBody(b) => ar.withEntity(b) + case ByteBufferBody(b) => ar.withEntity(ByteString(b)) + case InputStreamBody(b) => ar.withEntity(HttpEntity(ct, StreamConverters.fromInputStream(() => b))) + case FileBody(b) => ar.withEntity(ct, b.toPath) + case PathBody(b) => ar.withEntity(ct, b) + case s@SerializableBody(_, _) => doSetSerializable(s) + } - private def setSerializableBodyOnAkka[T](r: Request, body: SerializableBody[T], ar: HttpRequest): Future[HttpRequest] = body match { - case SerializableBody(SourceBodySerializer, t) => - getContentTypeOrOctetStream(r).map(ct => ar.withEntity(HttpEntity(ct, t))) + def doSetSerializable[T](body: SerializableBody[T]): HttpRequest = body match { + case SerializableBody(SourceBodySerializer, t) => ar.withEntity(HttpEntity(ct, t)) + case SerializableBody(f, t) => doSet(f(t)) + } - case SerializableBody(f, t) => setBodyOnAkka(r, f(t), ar) + doSet(body) + } } - private def getContentTypeOrOctetStream(r: Request): Future[ContentType] = { + private def getContentTypeOrOctetStream(r: Request): Try[ContentType] = { r.headers - .find(_._1.toLowerCase.contains(`Content-Type`.lowercaseName)) + .find(isContentType) .map(_._2) .map { ct => ContentType.parse(ct).fold( - errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")), - Future.successful) + errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")), + Success(_)) } - .getOrElse(Future.successful(`application/octet-stream`)) + .getOrElse(Success(`application/octet-stream`)) } + private def isContentType(header: (String, String)) = header._1.toLowerCase.contains(`Content-Type`.lowercaseName) + def close(): Future[Terminated] = { actorSystem.terminate() } -- cgit v1.2.3