From 034c40595f217ef1f11ca351666a03aa08976b81 Mon Sep 17 00:00:00 2001 From: adamw Date: Thu, 29 Jun 2017 17:24:46 +0200 Subject: Initital draft --- .../sttp/akkahttp/AkkaHttpSttpHandler.scala | 89 ++++++++++++++++++++++ .../com/softwaremill/sttp/akkahttp/package.scala | 14 ++++ 2 files changed, 103 insertions(+) create mode 100644 akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala create mode 100644 akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala (limited to 'akka-http-handler/src') diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala new file mode 100644 index 0000000..e167dd5 --- /dev/null +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -0,0 +1,89 @@ +package com.softwaremill.sttp.akkahttp + +import akka.actor.{ActorSystem, Terminated} +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.HttpHeader.ParsingResult +import akka.http.scaladsl.model._ +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.softwaremill.sttp.{IgnoreResponseBody, Method, Request, Response, ResponseBodyReader, SttpStreamHandler} + +import scala.concurrent.Future + +class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpStreamHandler[Future, Source[ByteString, Any]] { + def this() = this(ActorSystem("sttp")) + + private implicit val as = actorSystem + private implicit val materializer = ActorMaterializer() + import as.dispatcher + + override def send[T](r: Request, responseReader: ResponseBodyReader[T]): Future[Response[T]] = { + requestToAkka(r).flatMap { ar => + Http().singleRequest(ar).flatMap { hr => + val code = hr.status.intValue() + bodyFromAkkaResponse(responseReader, hr).map(Response(code, _)) + } + } + } + + override def sendStream[T](r: Request, contentType: String, stream: Source[ByteString, Any], + responseReader: ResponseBodyReader[T]): Future[Response[T]] = { + + for { + ar <- requestToAkka(r) + ct <- contentTypeToAkka(contentType) + hr <- Http().singleRequest(ar.withEntity(HttpEntity(ct, stream))) + body <- bodyFromAkkaResponse(responseReader, hr) + } yield Response(hr.status.intValue(), body) + } + + private def convertMethod(m: Method): HttpMethod = m match { + case Method.GET => HttpMethods.GET + case Method.POST => HttpMethods.POST + case _ => HttpMethod.custom(m.m) + } + + private def bodyFromAkkaResponse[T](rr: ResponseBodyReader[T], hr: HttpResponse): Future[T] = rr match { + case IgnoreResponseBody => + hr.discardEntityBytes() + Future.successful(()) + + case AkkaStreamsSourceResponseBody => + Future.successful(hr.entity.dataBytes) + + case _ => + hr.entity.dataBytes + .runFold(ByteString(""))(_ ++ _) + .map(_.toArray[Byte]) + .map(rr.fromBytes) + } + + private def requestToAkka(r: Request): Future[HttpRequest] = { + val ar = HttpRequest(uri = r.uri.toString, method = convertMethod(r.method)) + val parsed = r.headers.map(h => HttpHeader.parse(h._1, h._2)) + val errors = parsed.collect { + case ParsingResult.Error(e) => e + } + if (errors.isEmpty) { + val headers = parsed.collect { + case ParsingResult.Ok(h, _) => h + } + + Future.successful(ar.withHeaders(headers.toList)) + } else { + Future.failed(new RuntimeException(s"Cannot parse headers: $errors")) + } + } + + private def contentTypeToAkka(ct: String): Future[ContentType] = { + ContentType.parse(ct).fold( + errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")), + Future.successful) + } + + def close(): Future[Terminated] = { + actorSystem.terminate() + } +} + diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala new file mode 100644 index 0000000..3b9a1aa --- /dev/null +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala @@ -0,0 +1,14 @@ +package com.softwaremill.sttp + +import java.io.InputStream + +import akka.stream.scaladsl.Source +import akka.util.ByteString + +package object akkahttp { + implicit object AkkaStreamsSourceResponseBody extends ResponseBodyReader[Source[ByteString, Any]] { + override def fromInputStream(is: InputStream): Source[ByteString, Any] = ??? + + override def fromBytes(bytes: Array[Byte]): Source[ByteString, Any] = ??? + } +} -- cgit v1.2.3