diff options
author | adamw <adam@warski.org> | 2017-07-09 09:32:31 +0200 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-07-09 09:32:31 +0200 |
commit | 820095216e671888cfa607b329d749ba099a7bc1 (patch) | |
tree | 9276e9ec7be44ccc78dfdf9f2a93475deb36d6ff /akka-http-handler | |
parent | 08dd8ea1b55e62925aeee72de5d63a1e1961b060 (diff) | |
download | sttp-820095216e671888cfa607b329d749ba099a7bc1.tar.gz sttp-820095216e671888cfa607b329d749ba099a7bc1.tar.bz2 sttp-820095216e671888cfa607b329d749ba099a7bc1.zip |
Source formatting
Diffstat (limited to 'akka-http-handler')
-rw-r--r-- | akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala | 74 | ||||
-rw-r--r-- | akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala | 6 |
2 files changed, 52 insertions, 28 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index df40fc9..fc2d632 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -15,7 +15,8 @@ import com.softwaremill.sttp.model._ import scala.concurrent.Future import scala.util.{Failure, Success, Try} -class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, Source[ByteString, Any]] { +class AkkaHttpSttpHandler(actorSystem: ActorSystem) + extends SttpHandler[Future, Source[ByteString, Any]] { def this() = this(ActorSystem("sttp")) @@ -23,11 +24,16 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, private implicit val materializer = ActorMaterializer() import as.dispatcher - override def send[T](r: Request, responseAs: ResponseAs[T, Source[ByteString, Any]]): Future[Response[T]] = { - requestToAkka(r).map(setBodyOnAkka(r, r.body, _).get).flatMap(Http().singleRequest(_)).flatMap { hr => - val code = hr.status.intValue() - bodyFromAkka(responseAs, hr).map(Response(code, _)) - } + override def send[T](r: Request, + responseAs: ResponseAs[T, Source[ByteString, Any]]) + : Future[Response[T]] = { + requestToAkka(r) + .map(setBodyOnAkka(r, r.body, _).get) + .flatMap(Http().singleRequest(_)) + .flatMap { hr => + val code = hr.status.intValue() + bodyFromAkka(responseAs, hr).map(Response(code, _)) + } } private def methodToAkka(m: Method): HttpMethod = m match { @@ -43,10 +49,12 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, case _ => HttpMethod.custom(m.m) } - private def bodyFromAkka[T](rr: ResponseAs[T, Source[ByteString, Any]], hr: HttpResponse): Future[T] = { - def asByteArray = hr.entity.dataBytes - .runFold(ByteString(""))(_ ++ _) - .map(_.toArray[Byte]) + private def bodyFromAkka[T](rr: ResponseAs[T, Source[ByteString, Any]], + hr: HttpResponse): Future[T] = { + def asByteArray = + hr.entity.dataBytes + .runFold(ByteString(""))(_ ++ _) + .map(_.toArray[Byte]) rr match { case IgnoreResponse => @@ -59,14 +67,15 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, case ResponseAsByteArray => asByteArray - case r@ResponseAsStream() => + case r @ ResponseAsStream() => Future.successful(r.responseIsStream(hr.entity.dataBytes)) } } private def requestToAkka(r: Request): Future[HttpRequest] = { val ar = HttpRequest(uri = r.uri.toString, method = methodToAkka(r.method)) - val parsed = r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2)) + val parsed = + r.headers.filterNot(isContentType).map(h => HttpHeader.parse(h._1, h._2)) val errors = parsed.collect { case ParsingResult.Error(e) => e } @@ -81,26 +90,34 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, } } - private def setBodyOnAkka(r: Request, body: RequestBody, ar: HttpRequest): Try[HttpRequest] = { + private def setBodyOnAkka(r: Request, + body: RequestBody, + ar: HttpRequest): Try[HttpRequest] = { getContentTypeOrOctetStream(r).map { ct => - def doSet(body: RequestBody): HttpRequest = body match { case NoBody => ar case StringBody(b, encoding) => - val ctWithEncoding = HttpCharsets.getForKey(encoding).map(hc => ContentType.apply(ct.mediaType, () => hc)).getOrElse(ct) + val ctWithEncoding = HttpCharsets + .getForKey(encoding) + .map(hc => ContentType.apply(ct.mediaType, () => hc)) + .getOrElse(ct) ar.withEntity(ctWithEncoding, b.getBytes(encoding)) case ByteArrayBody(b) => ar.withEntity(b) case ByteBufferBody(b) => ar.withEntity(ByteString(b)) - case InputStreamBody(b) => ar.withEntity(HttpEntity(ct, StreamConverters.fromInputStream(() => b))) + case InputStreamBody(b) => + ar.withEntity( + HttpEntity(ct, StreamConverters.fromInputStream(() => b))) case FileBody(b) => ar.withEntity(ct, b.toPath) case PathBody(b) => ar.withEntity(ct, b) - case s@SerializableBody(_, _) => doSetSerializable(s) + case s @ SerializableBody(_, _) => doSetSerializable(s) } - def doSetSerializable[T](body: SerializableBody[T]): HttpRequest = body match { - case SerializableBody(SourceBodySerializer, t) => ar.withEntity(HttpEntity(ct, t)) - case SerializableBody(f, t) => doSet(f(t)) - } + def doSetSerializable[T](body: SerializableBody[T]): HttpRequest = + body match { + case SerializableBody(SourceBodySerializer, t) => + ar.withEntity(HttpEntity(ct, t)) + case SerializableBody(f, t) => doSet(f(t)) + } doSet(body) } @@ -111,16 +128,21 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpHandler[Future, .find(isContentType) .map(_._2) .map { ct => - ContentType.parse(ct).fold( - errors => Failure(new RuntimeException(s"Cannot parse content type: $errors")), - Success(_)) + ContentType + .parse(ct) + .fold( + errors => + Failure( + new RuntimeException(s"Cannot parse content type: $errors")), + Success(_)) } .getOrElse(Success(`application/octet-stream`)) } - private def isContentType(header: (String, String)) = header._1.toLowerCase.contains(`Content-Type`.lowercaseName) + private def isContentType(header: (String, String)) = + header._1.toLowerCase.contains(`Content-Type`.lowercaseName) def close(): Future[Terminated] = { actorSystem.terminate() } -}
\ No newline at end of file +} diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala index 669af5e..0357dd8 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala @@ -5,10 +5,12 @@ import akka.util.ByteString import com.softwaremill.sttp.model.{BasicRequestBody, BodySerializer} package object akkahttp { - private[akkahttp] case object SourceBodySerializer extends BodySerializer[Source[ByteString, Any]] { + private[akkahttp] case object SourceBodySerializer + extends BodySerializer[Source[ByteString, Any]] { def apply(t: Source[ByteString, Any]): BasicRequestBody = throw new RuntimeException("Can only be used with akka-http handler") } - implicit val sourceBodySerializer: BodySerializer[Source[ByteString, Any]] = SourceBodySerializer + implicit val sourceBodySerializer: BodySerializer[Source[ByteString, Any]] = + SourceBodySerializer } |