diff options
author | adamw <adam@warski.org> | 2017-06-30 13:02:46 +0200 |
---|---|---|
committer | adamw <adam@warski.org> | 2017-06-30 13:02:46 +0200 |
commit | b9776b71297d30f9a49e52ca0639e8bade9b7e05 (patch) | |
tree | a277c32e6061291de97e0d3fbe3da0a1d31a0646 | |
parent | 3664508d913fa5787ec6144fee6a12ecfe2f14ae (diff) | |
download | sttp-b9776b71297d30f9a49e52ca0639e8bade9b7e05.tar.gz sttp-b9776b71297d30f9a49e52ca0639e8bade9b7e05.tar.bz2 sttp-b9776b71297d30f9a49e52ca0639e8bade9b7e05.zip |
Simple send stream
6 files changed, 67 insertions, 53 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala index 483f0a7..0f08684 100644 --- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala @@ -4,6 +4,8 @@ import akka.actor.{ActorSystem, Terminated} import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpHeader.ParsingResult import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.`Content-Type` +import akka.http.scaladsl.model.ContentTypes.`application/octet-stream` import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.util.ByteString @@ -20,40 +22,19 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpStreamHandler[Fu import as.dispatcher override def send[T](r: Request, responseAs: ResponseAs[T]): Future[Response[T]] = { - requestToAkka(r).flatMap(Http().singleRequest(_)).flatMap { hr => + requestToAkka(r).flatMap(setBody(r, r.body, _)).flatMap(Http().singleRequest(_)).flatMap { hr => val code = hr.status.intValue() bodyFromAkkaResponse(responseAs, hr).map(Response(code, _)) } } override def send(r: Request, responseAsStream: ResponseAsStream[Source[ByteString, Any]]): Future[Response[Source[ByteString, Any]]] = { - requestToAkka(r).flatMap(Http().singleRequest(_)).map { hr => + requestToAkka(r).flatMap(setBody(r, r.body, _)).flatMap(Http().singleRequest(_)).map { hr => val code = hr.status.intValue() Response(code, hr.entity.dataBytes) } } - override def sendStream[T](r: Request, contentType: String, stream: Source[ByteString, Any], - responseAs: ResponseAs[T]): Future[Response[T]] = { - - for { - ar <- requestToAkka(r) - ct <- contentTypeToAkka(contentType) - hr <- Http().singleRequest(ar.withEntity(HttpEntity(ct, stream))) - body <- bodyFromAkkaResponse(responseAs, hr) - } yield Response(hr.status.intValue(), body) - } - - override def sendStream(r: Request, contentType: String, stream: Source[ByteString, Any], - responseAsStream: ResponseAsStream[Source[ByteString, Any]]): Future[Response[Source[ByteString, Any]]] = { - - for { - ar <- requestToAkka(r) - ct <- contentTypeToAkka(contentType) - hr <- Http().singleRequest(ar.withEntity(HttpEntity(ct, stream))) - } yield Response(hr.status.intValue(), hr.entity.dataBytes) - } - private def convertMethod(m: Method): HttpMethod = m match { case Method.GET => HttpMethods.GET case Method.POST => HttpMethods.POST @@ -95,10 +76,34 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpStreamHandler[Fu } } - private def contentTypeToAkka(ct: String): Future[ContentType] = { - ContentType.parse(ct).fold( - errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")), - Future.successful) + private def setBody(r: Request, body: RequestBody, ar: HttpRequest): Future[HttpRequest] = body match { + case NoBody => Future.successful(ar) + case StringBody(b) => Future.successful(ar.withEntity(b)) + case ByteArrayBody(b) => Future.successful(ar.withEntity(b)) + case ByteBufferBody(b) => Future.successful(ar.withEntity(ByteString(b))) + case InputStreamBody(b) => Future.successful(ar) //TODO + case FileBody(b) => Future.successful(ar)//TODO + case PathBody(b) => Future.successful(ar) //TODO + case sb@SerializableBody(_, _) => setSerializableBody(r, sb, ar) + } + + private def setSerializableBody[T](r: Request, body: SerializableBody[T], ar: HttpRequest): Future[HttpRequest] = body match { + case SerializableBody(SourceBodySerializer, t) => + getContentTypeOrOctetStream(r).map(ct => ar.withEntity(HttpEntity(ct, t))) + + case SerializableBody(f, t) => setBody(r, f(t), ar) + } + + private def getContentTypeOrOctetStream(r: Request): Future[ContentType] = { + r.headers + .find(_._1.toLowerCase.contains(`Content-Type`.lowercaseName)) + .map(_._2) + .map { ct => + ContentType.parse(ct).fold( + errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")), + Future.successful) + } + .getOrElse(Future.successful(`application/octet-stream`)) } def close(): Future[Terminated] = { diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala new file mode 100644 index 0000000..669af5e --- /dev/null +++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala @@ -0,0 +1,14 @@ +package com.softwaremill.sttp + +import akka.stream.scaladsl.Source +import akka.util.ByteString +import com.softwaremill.sttp.model.{BasicRequestBody, BodySerializer} + +package object akkahttp { + private[akkahttp] case object SourceBodySerializer extends BodySerializer[Source[ByteString, Any]] { + def apply(t: Source[ByteString, Any]): BasicRequestBody = + throw new RuntimeException("Can only be used with akka-http handler") + } + + implicit val sourceBodySerializer: BodySerializer[Source[ByteString, Any]] = SourceBodySerializer +} diff --git a/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala index b9c6af4..7ed7cc7 100644 --- a/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala +++ b/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala @@ -16,14 +16,14 @@ object HttpConnectionSttpHandler extends SttpHandler[Id] { c.setRequestMethod(r.method.m) r.headers.foreach { case (k, v) => c.setRequestProperty(k, v) } c.setDoInput(true) - setBody(r, c) + setBody(r.body, c) val status = c.getResponseCode Response(status, readResponse(c.getInputStream, responseAs)) } - private def setBody(r: Request, c: HttpURLConnection): Unit = { - if (r.body != NoBody) c.setDoOutput(true) + private def setBody(body: RequestBody, c: HttpURLConnection): Unit = { + if (body != NoBody) c.setDoOutput(true) def copyStream(in: InputStream, out: OutputStream): Unit = { val buf = new Array[Byte](1024) @@ -40,7 +40,7 @@ object HttpConnectionSttpHandler extends SttpHandler[Id] { doCopy() } - r.body match { + body match { case NoBody => // skip case StringBody(b) => @@ -57,14 +57,14 @@ object HttpConnectionSttpHandler extends SttpHandler[Id] { case InputStreamBody(b) => copyStream(b, c.getOutputStream) - case InputStreamSupplierBody(b) => - copyStream(b(), c.getOutputStream) - case FileBody(b) => Files.copy(b.toPath, c.getOutputStream) case PathBody(b) => Files.copy(b, c.getOutputStream) + + case SerializableBody(f, t) => + setBody(f(t), c) } } diff --git a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala index aaab92d..05c7199 100644 --- a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala +++ b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala @@ -10,6 +10,4 @@ trait SttpHandler[R[_]] { trait SttpStreamHandler[R[_], S] extends SttpHandler[R] { def send(request: Request, responseAsStream: ResponseAsStream[S]): R[Response[S]] - def sendStream[T](request: Request, contentType: String, stream: S, responseAs: ResponseAs[T]): R[Response[T]] - def sendStream(request: Request, contentType: String, stream: S, responseAsStream: ResponseAsStream[S]): R[Response[S]] }
\ No newline at end of file diff --git a/core/src/main/scala/com/softwaremill/sttp/model/package.scala b/core/src/main/scala/com/softwaremill/sttp/model/package.scala index 00f1261..df9ef1b 100644 --- a/core/src/main/scala/com/softwaremill/sttp/model/package.scala +++ b/core/src/main/scala/com/softwaremill/sttp/model/package.scala @@ -16,14 +16,21 @@ package object model { val PATCH = Method("PATCH") } + /** + * Provide an implicit value of this type to serialize arbitrary classes into a request body. + * Handlers might also provide special logic for serializer instances which they define (e.g. to handle streaming). + */ + type BodySerializer[T] = T => BasicRequestBody + sealed trait RequestBody - sealed trait BasicRequestBody extends RequestBody case object NoBody extends RequestBody + case class SerializableBody[T](f: BodySerializer[T], t: T) extends RequestBody + + sealed trait BasicRequestBody extends RequestBody case class StringBody(s: String) extends BasicRequestBody case class ByteArrayBody(b: Array[Byte]) extends BasicRequestBody case class ByteBufferBody(b: ByteBuffer) extends BasicRequestBody case class InputStreamBody(b: InputStream) extends BasicRequestBody - case class InputStreamSupplierBody(b: () => InputStream) extends BasicRequestBody case class FileBody(f: File) extends BasicRequestBody case class PathBody(f: Path) extends BasicRequestBody @@ -32,5 +39,5 @@ package object model { case class ResponseAsString(encoding: String) extends ResponseAs[String] object ResponseAsByteArray extends ResponseAs[Array[Byte]] - case class ResponseAsStream[-S]() + case class ResponseAsStream[S]() } diff --git a/core/src/main/scala/com/softwaremill/sttp/package.scala b/core/src/main/scala/com/softwaremill/sttp/package.scala index b007d6a..c787643 100644 --- a/core/src/main/scala/com/softwaremill/sttp/package.scala +++ b/core/src/main/scala/com/softwaremill/sttp/package.scala @@ -79,7 +79,6 @@ package object sttp { def multiPart(name: String, data: Array[Byte]): MultiPart = MultiPart(name, ByteArrayBody(data)) def multiPart(name: String, data: ByteBuffer): MultiPart = MultiPart(name, ByteBufferBody(data)) def multiPart(name: String, data: InputStream): MultiPart = MultiPart(name, InputStreamBody(data)) - def multiPart(name: String, data: () => InputStream): MultiPart = MultiPart(name, InputStreamSupplierBody(data)) // mandatory content type? def multiPart(name: String, data: File): MultiPart = MultiPart(name, FileBody(data), fileName = Some(data.getName)) def multiPart(name: String, data: Path): MultiPart = MultiPart(name, PathBody(data), fileName = Some(data.getFileName.toString)) @@ -101,15 +100,18 @@ package object sttp { def header(k: String, v: String): RequestTemplate[U] = this.copy(headers = headers + (k -> v)) + // automatically set the content type? - unless specified def data(b: String): RequestTemplate[U] = this.copy(body = StringBody(b)) def data(b: Array[Byte]): RequestTemplate[U] = this.copy(body = ByteArrayBody(b)) def data(b: ByteBuffer): RequestTemplate[U] = this.copy(body = ByteBufferBody(b)) def data(b: InputStream): RequestTemplate[U] = this.copy(body = InputStreamBody(b)) - def data(b: () => InputStream): RequestTemplate[U] = this.copy(body = InputStreamSupplierBody(b)) // mandatory content type? def data(b: File): RequestTemplate[U] = this.copy(body = FileBody(b)) def data(b: Path): RequestTemplate[U] = this.copy(body = PathBody(b)) + def data[T: BodySerializer](b: T): RequestTemplate[U] = this.copy(body = SerializableBody(implicitly[BodySerializer[T]], b)) + // add serializable / deserializable? + //def formData(fs: Map[String, Seq[String]]): RequestTemplate[U] = ??? def formData(fs: Map[String, String]): RequestTemplate[U] = ??? def formData(fs: (String, String)*): RequestTemplate[U] = ??? @@ -126,18 +128,6 @@ package object sttp { handler.send(this, responseAs) } - - def sendStream[R[_], S, T](contentType: String, stream: S, responseAs: ResponseAs[T])( - implicit handler: SttpStreamHandler[R, S], isRequest: IsRequest[U]): R[Response[T]] = { - - handler.sendStream(this, contentType, stream, responseAs) - } - - def sendStream[R[_], S](contentType: String, stream: S, responseAs: ResponseAsStream[S])( - implicit handler: SttpStreamHandler[R, S], isRequest: IsRequest[U]): R[Response[S]] = { - - handler.sendStream(this, contentType, stream, responseAs) - } } object RequestTemplate { |