From b9776b71297d30f9a49e52ca0639e8bade9b7e05 Mon Sep 17 00:00:00 2001 From: adamw Date: Fri, 30 Jun 2017 13:02:46 +0200 Subject: Simple send stream --- .../sttp/akkahttp/AkkaHttpSttpHandler.scala | 59 ++++++++++++---------- .../com/softwaremill/sttp/akkahttp/package.scala | 14 +++++ 2 files changed, 46 insertions(+), 27 deletions(-) create mode 100644 akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala (limited to 'akka-http-handler/src/main/scala/com') diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index 483f0a7..0f08684 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -4,6 +4,8 @@ import akka.actor.{ActorSystem, Terminated} import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpHeader.ParsingResult import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.`Content-Type` +import akka.http.scaladsl.model.ContentTypes.`application/octet-stream` import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.util.ByteString @@ -20,40 +22,19 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpStreamHandler[Fu import as.dispatcher override def send[T](r: Request, responseAs: ResponseAs[T]): Future[Response[T]] = { - requestToAkka(r).flatMap(Http().singleRequest(_)).flatMap { hr => + requestToAkka(r).flatMap(setBody(r, r.body, _)).flatMap(Http().singleRequest(_)).flatMap { hr => val code = hr.status.intValue() bodyFromAkkaResponse(responseAs, hr).map(Response(code, _)) } } override def send(r: Request, responseAsStream: ResponseAsStream[Source[ByteString, Any]]): Future[Response[Source[ByteString, Any]]] = { - requestToAkka(r).flatMap(Http().singleRequest(_)).map { hr => + requestToAkka(r).flatMap(setBody(r, r.body, _)).flatMap(Http().singleRequest(_)).map { hr => val code = hr.status.intValue() Response(code, hr.entity.dataBytes) } } - override def sendStream[T](r: Request, contentType: String, stream: Source[ByteString, Any], - responseAs: ResponseAs[T]): Future[Response[T]] = { - - for { - ar <- requestToAkka(r) - ct <- contentTypeToAkka(contentType) - hr <- Http().singleRequest(ar.withEntity(HttpEntity(ct, stream))) - body <- bodyFromAkkaResponse(responseAs, hr) - } yield Response(hr.status.intValue(), body) - } - - override def sendStream(r: Request, contentType: String, stream: Source[ByteString, Any], - responseAsStream: ResponseAsStream[Source[ByteString, Any]]): Future[Response[Source[ByteString, Any]]] = { - - for { - ar <- requestToAkka(r) - ct <- contentTypeToAkka(contentType) - hr <- Http().singleRequest(ar.withEntity(HttpEntity(ct, stream))) - } yield Response(hr.status.intValue(), hr.entity.dataBytes) - } - private def convertMethod(m: Method): HttpMethod = m match { case Method.GET => HttpMethods.GET case Method.POST => HttpMethods.POST @@ -95,10 +76,34 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpStreamHandler[Fu } } - private def contentTypeToAkka(ct: String): Future[ContentType] = { - ContentType.parse(ct).fold( - errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")), - Future.successful) + private def setBody(r: Request, body: RequestBody, ar: HttpRequest): Future[HttpRequest] = body match { + case NoBody => Future.successful(ar) + case StringBody(b) => Future.successful(ar.withEntity(b)) + case ByteArrayBody(b) => Future.successful(ar.withEntity(b)) + case ByteBufferBody(b) => Future.successful(ar.withEntity(ByteString(b))) + case InputStreamBody(b) => Future.successful(ar) //TODO + case FileBody(b) => Future.successful(ar)//TODO + case PathBody(b) => Future.successful(ar) //TODO + case sb@SerializableBody(_, _) => setSerializableBody(r, sb, ar) + } + + private def setSerializableBody[T](r: Request, body: SerializableBody[T], ar: HttpRequest): Future[HttpRequest] = body match { + case SerializableBody(SourceBodySerializer, t) => + getContentTypeOrOctetStream(r).map(ct => ar.withEntity(HttpEntity(ct, t))) + + case SerializableBody(f, t) => setBody(r, f(t), ar) + } + + private def getContentTypeOrOctetStream(r: Request): Future[ContentType] = { + r.headers + .find(_._1.toLowerCase.contains(`Content-Type`.lowercaseName)) + .map(_._2) + .map { ct => + ContentType.parse(ct).fold( + errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")), + Future.successful) + } + .getOrElse(Future.successful(`application/octet-stream`)) } def close(): Future[Terminated] = { diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala new file mode 100644 index 0000000..669af5e --- /dev/null +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala @@ -0,0 +1,14 @@ +package com.softwaremill.sttp + +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.softwaremill.sttp.model.{BasicRequestBody, BodySerializer} + +package object akkahttp { + private[akkahttp] case object SourceBodySerializer extends BodySerializer[Source[ByteString, Any]] { + def apply(t: Source[ByteString, Any]): BasicRequestBody = + throw new RuntimeException("Can only be used with akka-http handler") + } + + implicit val sourceBodySerializer: BodySerializer[Source[ByteString, Any]] = SourceBodySerializer +} -- cgit v1.2.3