aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-06-30 13:02:46 +0200
committeradamw <adam@warski.org>2017-06-30 13:02:46 +0200
commitb9776b71297d30f9a49e52ca0639e8bade9b7e05 (patch)
treea277c32e6061291de97e0d3fbe3da0a1d31a0646
parent3664508d913fa5787ec6144fee6a12ecfe2f14ae (diff)
downloadsttp-b9776b71297d30f9a49e52ca0639e8bade9b7e05.tar.gz
sttp-b9776b71297d30f9a49e52ca0639e8bade9b7e05.tar.bz2
sttp-b9776b71297d30f9a49e52ca0639e8bade9b7e05.zip
Simple send stream
-rw-r--r--akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala59
-rw-r--r--akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala14
-rw-r--r--core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala14
-rw-r--r--core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala2
-rw-r--r--core/src/main/scala/com/softwaremill/sttp/model/package.scala13
-rw-r--r--core/src/main/scala/com/softwaremill/sttp/package.scala18
6 files changed, 67 insertions, 53 deletions
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
index 483f0a7..0f08684 100644
--- a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
+++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/AkkaHttpSttpHandler.scala
@@ -4,6 +4,8 @@ import akka.actor.{ActorSystem, Terminated}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpHeader.ParsingResult
import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers.`Content-Type`
+import akka.http.scaladsl.model.ContentTypes.`application/octet-stream`
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
@@ -20,40 +22,19 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpStreamHandler[Fu
import as.dispatcher
override def send[T](r: Request, responseAs: ResponseAs[T]): Future[Response[T]] = {
- requestToAkka(r).flatMap(Http().singleRequest(_)).flatMap { hr =>
+ requestToAkka(r).flatMap(setBody(r, r.body, _)).flatMap(Http().singleRequest(_)).flatMap { hr =>
val code = hr.status.intValue()
bodyFromAkkaResponse(responseAs, hr).map(Response(code, _))
}
}
override def send(r: Request, responseAsStream: ResponseAsStream[Source[ByteString, Any]]): Future[Response[Source[ByteString, Any]]] = {
- requestToAkka(r).flatMap(Http().singleRequest(_)).map { hr =>
+ requestToAkka(r).flatMap(setBody(r, r.body, _)).flatMap(Http().singleRequest(_)).map { hr =>
val code = hr.status.intValue()
Response(code, hr.entity.dataBytes)
}
}
- override def sendStream[T](r: Request, contentType: String, stream: Source[ByteString, Any],
- responseAs: ResponseAs[T]): Future[Response[T]] = {
-
- for {
- ar <- requestToAkka(r)
- ct <- contentTypeToAkka(contentType)
- hr <- Http().singleRequest(ar.withEntity(HttpEntity(ct, stream)))
- body <- bodyFromAkkaResponse(responseAs, hr)
- } yield Response(hr.status.intValue(), body)
- }
-
- override def sendStream(r: Request, contentType: String, stream: Source[ByteString, Any],
- responseAsStream: ResponseAsStream[Source[ByteString, Any]]): Future[Response[Source[ByteString, Any]]] = {
-
- for {
- ar <- requestToAkka(r)
- ct <- contentTypeToAkka(contentType)
- hr <- Http().singleRequest(ar.withEntity(HttpEntity(ct, stream)))
- } yield Response(hr.status.intValue(), hr.entity.dataBytes)
- }
-
private def convertMethod(m: Method): HttpMethod = m match {
case Method.GET => HttpMethods.GET
case Method.POST => HttpMethods.POST
@@ -95,10 +76,34 @@ class AkkaHttpSttpHandler(actorSystem: ActorSystem) extends SttpStreamHandler[Fu
}
}
- private def contentTypeToAkka(ct: String): Future[ContentType] = {
- ContentType.parse(ct).fold(
- errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")),
- Future.successful)
+ private def setBody(r: Request, body: RequestBody, ar: HttpRequest): Future[HttpRequest] = body match {
+ case NoBody => Future.successful(ar)
+ case StringBody(b) => Future.successful(ar.withEntity(b))
+ case ByteArrayBody(b) => Future.successful(ar.withEntity(b))
+ case ByteBufferBody(b) => Future.successful(ar.withEntity(ByteString(b)))
+ case InputStreamBody(b) => Future.successful(ar) //TODO
+ case FileBody(b) => Future.successful(ar)//TODO
+ case PathBody(b) => Future.successful(ar) //TODO
+ case sb@SerializableBody(_, _) => setSerializableBody(r, sb, ar)
+ }
+
+ private def setSerializableBody[T](r: Request, body: SerializableBody[T], ar: HttpRequest): Future[HttpRequest] = body match {
+ case SerializableBody(SourceBodySerializer, t) =>
+ getContentTypeOrOctetStream(r).map(ct => ar.withEntity(HttpEntity(ct, t)))
+
+ case SerializableBody(f, t) => setBody(r, f(t), ar)
+ }
+
+ private def getContentTypeOrOctetStream(r: Request): Future[ContentType] = {
+ r.headers
+ .find(_._1.toLowerCase.contains(`Content-Type`.lowercaseName))
+ .map(_._2)
+ .map { ct =>
+ ContentType.parse(ct).fold(
+ errors => Future.failed(new RuntimeException(s"Cannot parse content type: $errors")),
+ Future.successful)
+ }
+ .getOrElse(Future.successful(`application/octet-stream`))
}
def close(): Future[Terminated] = {
diff --git a/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala
new file mode 100644
index 0000000..669af5e
--- /dev/null
+++ b/akka-http-handler/src/main/scala/com/softwaremill/sttp/akkahttp/package.scala
@@ -0,0 +1,14 @@
+package com.softwaremill.sttp
+
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+import com.softwaremill.sttp.model.{BasicRequestBody, BodySerializer}
+
+package object akkahttp {
+ private[akkahttp] case object SourceBodySerializer extends BodySerializer[Source[ByteString, Any]] {
+ def apply(t: Source[ByteString, Any]): BasicRequestBody =
+ throw new RuntimeException("Can only be used with akka-http handler")
+ }
+
+ implicit val sourceBodySerializer: BodySerializer[Source[ByteString, Any]] = SourceBodySerializer
+}
diff --git a/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala
index b9c6af4..7ed7cc7 100644
--- a/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala
+++ b/core/src/main/scala/com/softwaremill/sttp/HttpConnectionSttpHandler.scala
@@ -16,14 +16,14 @@ object HttpConnectionSttpHandler extends SttpHandler[Id] {
c.setRequestMethod(r.method.m)
r.headers.foreach { case (k, v) => c.setRequestProperty(k, v) }
c.setDoInput(true)
- setBody(r, c)
+ setBody(r.body, c)
val status = c.getResponseCode
Response(status, readResponse(c.getInputStream, responseAs))
}
- private def setBody(r: Request, c: HttpURLConnection): Unit = {
- if (r.body != NoBody) c.setDoOutput(true)
+ private def setBody(body: RequestBody, c: HttpURLConnection): Unit = {
+ if (body != NoBody) c.setDoOutput(true)
def copyStream(in: InputStream, out: OutputStream): Unit = {
val buf = new Array[Byte](1024)
@@ -40,7 +40,7 @@ object HttpConnectionSttpHandler extends SttpHandler[Id] {
doCopy()
}
- r.body match {
+ body match {
case NoBody => // skip
case StringBody(b) =>
@@ -57,14 +57,14 @@ object HttpConnectionSttpHandler extends SttpHandler[Id] {
case InputStreamBody(b) =>
copyStream(b, c.getOutputStream)
- case InputStreamSupplierBody(b) =>
- copyStream(b(), c.getOutputStream)
-
case FileBody(b) =>
Files.copy(b.toPath, c.getOutputStream)
case PathBody(b) =>
Files.copy(b, c.getOutputStream)
+
+ case SerializableBody(f, t) =>
+ setBody(f(t), c)
}
}
diff --git a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala
index aaab92d..05c7199 100644
--- a/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala
+++ b/core/src/main/scala/com/softwaremill/sttp/SttpHandler.scala
@@ -10,6 +10,4 @@ trait SttpHandler[R[_]] {
trait SttpStreamHandler[R[_], S] extends SttpHandler[R] {
def send(request: Request, responseAsStream: ResponseAsStream[S]): R[Response[S]]
- def sendStream[T](request: Request, contentType: String, stream: S, responseAs: ResponseAs[T]): R[Response[T]]
- def sendStream(request: Request, contentType: String, stream: S, responseAsStream: ResponseAsStream[S]): R[Response[S]]
} \ No newline at end of file
diff --git a/core/src/main/scala/com/softwaremill/sttp/model/package.scala b/core/src/main/scala/com/softwaremill/sttp/model/package.scala
index 00f1261..df9ef1b 100644
--- a/core/src/main/scala/com/softwaremill/sttp/model/package.scala
+++ b/core/src/main/scala/com/softwaremill/sttp/model/package.scala
@@ -16,14 +16,21 @@ package object model {
val PATCH = Method("PATCH")
}
+ /**
+ * Provide an implicit value of this type to serialize arbitrary classes into a request body.
+ * Handlers might also provide special logic for serializer instances which they define (e.g. to handle streaming).
+ */
+ type BodySerializer[T] = T => BasicRequestBody
+
sealed trait RequestBody
- sealed trait BasicRequestBody extends RequestBody
case object NoBody extends RequestBody
+ case class SerializableBody[T](f: BodySerializer[T], t: T) extends RequestBody
+
+ sealed trait BasicRequestBody extends RequestBody
case class StringBody(s: String) extends BasicRequestBody
case class ByteArrayBody(b: Array[Byte]) extends BasicRequestBody
case class ByteBufferBody(b: ByteBuffer) extends BasicRequestBody
case class InputStreamBody(b: InputStream) extends BasicRequestBody
- case class InputStreamSupplierBody(b: () => InputStream) extends BasicRequestBody
case class FileBody(f: File) extends BasicRequestBody
case class PathBody(f: Path) extends BasicRequestBody
@@ -32,5 +39,5 @@ package object model {
case class ResponseAsString(encoding: String) extends ResponseAs[String]
object ResponseAsByteArray extends ResponseAs[Array[Byte]]
- case class ResponseAsStream[-S]()
+ case class ResponseAsStream[S]()
}
diff --git a/core/src/main/scala/com/softwaremill/sttp/package.scala b/core/src/main/scala/com/softwaremill/sttp/package.scala
index b007d6a..c787643 100644
--- a/core/src/main/scala/com/softwaremill/sttp/package.scala
+++ b/core/src/main/scala/com/softwaremill/sttp/package.scala
@@ -79,7 +79,6 @@ package object sttp {
def multiPart(name: String, data: Array[Byte]): MultiPart = MultiPart(name, ByteArrayBody(data))
def multiPart(name: String, data: ByteBuffer): MultiPart = MultiPart(name, ByteBufferBody(data))
def multiPart(name: String, data: InputStream): MultiPart = MultiPart(name, InputStreamBody(data))
- def multiPart(name: String, data: () => InputStream): MultiPart = MultiPart(name, InputStreamSupplierBody(data))
// mandatory content type?
def multiPart(name: String, data: File): MultiPart = MultiPart(name, FileBody(data), fileName = Some(data.getName))
def multiPart(name: String, data: Path): MultiPart = MultiPart(name, PathBody(data), fileName = Some(data.getFileName.toString))
@@ -101,15 +100,18 @@ package object sttp {
def header(k: String, v: String): RequestTemplate[U] = this.copy(headers = headers + (k -> v))
+ // automatically set the content type? - unless specified
def data(b: String): RequestTemplate[U] = this.copy(body = StringBody(b))
def data(b: Array[Byte]): RequestTemplate[U] = this.copy(body = ByteArrayBody(b))
def data(b: ByteBuffer): RequestTemplate[U] = this.copy(body = ByteBufferBody(b))
def data(b: InputStream): RequestTemplate[U] = this.copy(body = InputStreamBody(b))
- def data(b: () => InputStream): RequestTemplate[U] = this.copy(body = InputStreamSupplierBody(b))
// mandatory content type?
def data(b: File): RequestTemplate[U] = this.copy(body = FileBody(b))
def data(b: Path): RequestTemplate[U] = this.copy(body = PathBody(b))
+ def data[T: BodySerializer](b: T): RequestTemplate[U] = this.copy(body = SerializableBody(implicitly[BodySerializer[T]], b))
+ // add serializable / deserializable?
+ //def formData(fs: Map[String, Seq[String]]): RequestTemplate[U] = ???
def formData(fs: Map[String, String]): RequestTemplate[U] = ???
def formData(fs: (String, String)*): RequestTemplate[U] = ???
@@ -126,18 +128,6 @@ package object sttp {
handler.send(this, responseAs)
}
-
- def sendStream[R[_], S, T](contentType: String, stream: S, responseAs: ResponseAs[T])(
- implicit handler: SttpStreamHandler[R, S], isRequest: IsRequest[U]): R[Response[T]] = {
-
- handler.sendStream(this, contentType, stream, responseAs)
- }
-
- def sendStream[R[_], S](contentType: String, stream: S, responseAs: ResponseAsStream[S])(
- implicit handler: SttpStreamHandler[R, S], isRequest: IsRequest[U]): R[Response[S]] = {
-
- handler.sendStream(this, contentType, stream, responseAs)
- }
}
object RequestTemplate {