aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoradamw <adam@warski.org>2017-07-24 16:57:51 +0200
committeradamw <adam@warski.org>2017-07-24 16:57:51 +0200
commitb1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6 (patch)
tree6ef95abd69930cabb5b7566507af6dc56d25ebaf
parent95fee5083274bf0e856af8b868702f8965b92f1a (diff)
downloadsttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.gz
sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.tar.bz2
sttp-b1a539bd1fb5a5870c2e96c73f14e79b6caf4ff6.zip
Adding streaming to the monix async http client handler
-rw-r--r--README.md39
-rw-r--r--async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala32
-rw-r--r--async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala49
-rw-r--r--async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala13
-rw-r--r--async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala181
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala49
6 files changed, 315 insertions, 48 deletions
diff --git a/README.md b/README.md
index c5ca897..b843226 100644
--- a/README.md
+++ b/README.md
@@ -289,7 +289,44 @@ implicit val sttpHandler = FutureAsyncHttpClientHandler.usingConfig(asyncHttpCli
implicit val sttpHandler = FutureAsyncHttpClientHandler.usingClient(asyncHttpClient)
```
-Streaming is not (yet) supported.
+#### Streaming using Monix
+
+Currently, only the Monix handler supports streaming (as both Monix and Async
+Http Client support reactive streams `Publisher`s out of the box). The type of
+supported streams in this case is `Observable[ByteBuffer]`. That is, you can
+set such an observable as a request body:
+
+```scala
+import com.softwaremill.sttp._
+
+import java.nio.ByteBuffer
+import monix.reactive.Observable
+
+val obs: Observable[ByteBuffer] = ...
+
+sttp
+ .streamBody(obs)
+ .post(uri"...")
+```
+
+And receive responses as an observable stream:
+
+```scala
+import com.softwaremill.sttp._
+import com.softwaremill.sttp.asynchttpclient.monix._
+
+import java.nio.ByteBuffer
+import monix.eval.Task
+import monix.reactive.Observable
+
+implicit val sttpHandler = MonixAsyncHttpClientHandler()
+
+val response: Task[Response[Observable[ByteBuffer]]] =
+ sttp
+ .post(uri"...")
+ .response(asStream[Observable[ByteBuffer]])
+ .send()
+```
## Request type
diff --git a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
index adc679e..a2e49a2 100644
--- a/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/future/src/main/scala/com/softwaremill/sttp/asynchttpclient/future/FutureAsyncHttpClientHandler.scala
@@ -1,5 +1,7 @@
package com.softwaremill.sttp.asynchttpclient.future
+import java.nio.ByteBuffer
+
import com.softwaremill.sttp.asynchttpclient.{
AsyncHttpClientHandler,
MonadAsyncError
@@ -9,22 +11,50 @@ import org.asynchttpclient.{
AsyncHttpClientConfig,
DefaultAsyncHttpClient
}
+import org.reactivestreams.Publisher
import scala.concurrent.{ExecutionContext, Future, Promise}
class FutureAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)(
implicit ec: ExecutionContext)
- extends AsyncHttpClientHandler[Future](asyncHttpClient, new FutureMonad())
+ extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient,
+ new FutureMonad()) {
+
+ override protected def streamBodyToPublisher(
+ s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Nothing =
+ throw new IllegalStateException("This handler does not support streaming")
+}
object FutureAsyncHttpClientHandler {
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
def apply()(
implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
: FutureAsyncHttpClientHandler =
new FutureAsyncHttpClientHandler(new DefaultAsyncHttpClient())
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
def usingConfig(cfg: AsyncHttpClientConfig)(
implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
: FutureAsyncHttpClientHandler =
new FutureAsyncHttpClientHandler(new DefaultAsyncHttpClient())
+
+ /**
+ * @param ec The execution context for running non-network related operations,
+ * e.g. mapping responses. Defaults to the global execution
+ * context.
+ */
def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext =
ExecutionContext.Implicits.global)
: FutureAsyncHttpClientHandler =
diff --git a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
index fab9c98..c77e6d9 100644
--- a/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/monix/src/main/scala/com/softwaremill/sttp/asynchttpclient/monix/MonixAsyncHttpClientHandler.scala
@@ -1,32 +1,65 @@
package com.softwaremill.sttp.asynchttpclient.monix
+import java.nio.ByteBuffer
+
import com.softwaremill.sttp.asynchttpclient.{
AsyncHttpClientHandler,
MonadAsyncError
}
import monix.eval.Task
-import monix.execution.Cancelable
+import monix.execution.{Cancelable, Scheduler}
+import monix.reactive.Observable
import org.asynchttpclient.{
AsyncHttpClient,
AsyncHttpClientConfig,
DefaultAsyncHttpClient
}
+import org.reactivestreams.Publisher
import scala.util.{Failure, Success}
-class MonixAsyncHttpClientHandler(asyncHttpClient: AsyncHttpClient)
- extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad) {
+class MonixAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)(
+ implicit scheduler: Scheduler)
+ extends AsyncHttpClientHandler[Task, Observable[ByteBuffer]](
+ asyncHttpClient,
+ TaskMonad) {
+
+ override protected def streamBodyToPublisher(
+ s: Observable[ByteBuffer]): Publisher[ByteBuffer] = {
+ s.toReactivePublisher
+ }
- def this() = this(new DefaultAsyncHttpClient())
- def this(cfg: AsyncHttpClientConfig) = this(new DefaultAsyncHttpClient(cfg))
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Observable[ByteBuffer] =
+ Observable.fromReactivePublisher(p)
}
object MonixAsyncHttpClientHandler {
- def apply(): MonixAsyncHttpClientHandler =
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def apply()(implicit s: Scheduler = Scheduler.Implicits.global)
+ : MonixAsyncHttpClientHandler =
new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient())
- def usingConfig(cfg: AsyncHttpClientConfig): MonixAsyncHttpClientHandler =
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingConfig(cfg: AsyncHttpClientConfig)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : MonixAsyncHttpClientHandler =
new MonixAsyncHttpClientHandler(new DefaultAsyncHttpClient())
- def usingClient(client: AsyncHttpClient): MonixAsyncHttpClientHandler =
+
+ /**
+ * @param s The scheduler used for streaming request bodies. Defaults to the
+ * global scheduler.
+ */
+ def usingClient(client: AsyncHttpClient)(implicit s: Scheduler =
+ Scheduler.Implicits.global)
+ : MonixAsyncHttpClientHandler =
new MonixAsyncHttpClientHandler(client)
}
diff --git a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
index cb3bdef..0460fff 100644
--- a/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
+++ b/async-http-client-handler/scalaz/src/main/scala/com/softwaremill/sttp/asynchttpclient/scalaz/ScalazAsyncHttpClientHandler.scala
@@ -1,5 +1,7 @@
package com.softwaremill.sttp.asynchttpclient.scalaz
+import java.nio.ByteBuffer
+
import com.softwaremill.sttp.asynchttpclient.{
AsyncHttpClientHandler,
MonadAsyncError
@@ -9,12 +11,21 @@ import org.asynchttpclient.{
AsyncHttpClientConfig,
DefaultAsyncHttpClient
}
+import org.reactivestreams.Publisher
import scalaz.{-\/, \/-}
import scalaz.concurrent.Task
class ScalazAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient)
- extends AsyncHttpClientHandler[Task](asyncHttpClient, TaskMonad)
+ extends AsyncHttpClientHandler[Task, Nothing](asyncHttpClient, TaskMonad) {
+
+ override protected def streamBodyToPublisher(
+ s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
+
+ override protected def publisherToStreamBody(
+ p: Publisher[ByteBuffer]): Nothing =
+ throw new IllegalStateException("This handler does not support streaming")
+}
object ScalazAsyncHttpClientHandler {
def apply(): ScalazAsyncHttpClientHandler =
diff --git a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
index 1683c3d..f89c85a 100644
--- a/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
+++ b/async-http-client-handler/src/main/scala/com/softwaremill/sttp/asynchttpclient/AsyncHttpClientHandler.scala
@@ -1,46 +1,143 @@
package com.softwaremill.sttp.asynchttpclient
+import java.nio.ByteBuffer
import java.nio.charset.Charset
import com.softwaremill.sttp.model._
-import com.softwaremill.sttp.{Request, Response, SttpHandler}
+import com.softwaremill.sttp.{
+ Request,
+ Response,
+ SttpHandler,
+ ContentLengthHeader
+}
+import org.asynchttpclient.AsyncHandler.State
+import org.asynchttpclient.handler.StreamedAsyncHandler
import org.asynchttpclient.{
AsyncCompletionHandler,
+ AsyncHandler,
AsyncHttpClient,
+ HttpResponseBodyPart,
+ HttpResponseHeaders,
+ HttpResponseStatus,
RequestBuilder,
Request => AsyncRequest,
Response => AsyncResponse
}
+import org.reactivestreams.{Publisher, Subscriber, Subscription}
import scala.collection.JavaConverters._
import scala.language.higherKinds
-class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
- rm: MonadAsyncError[R])
- extends SttpHandler[R, Nothing] {
+abstract class AsyncHttpClientHandler[R[_], S](
+ asyncHttpClient: AsyncHttpClient,
+ rm: MonadAsyncError[R])
+ extends SttpHandler[R, S] {
+
+ override def send[T](r: Request[T, S]): R[Response[T]] = {
+ val preparedRequest = asyncHttpClient
+ .prepareRequest(requestToAsync(r))
- override def send[T](r: Request[T, Nothing]): R[Response[T]] = {
rm.flatten(rm.async[R[Response[T]]] { cb =>
- asyncHttpClient
- .prepareRequest(requestToAsync(r))
- .execute(new AsyncCompletionHandler[AsyncResponse] {
- override def onCompleted(response: AsyncResponse): AsyncResponse = {
- cb(Right(readResponse(response, r.responseAs)))
- response
- }
- override def onThrowable(t: Throwable): Unit = cb(Left(t))
- })
+ def success(r: R[Response[T]]) = cb(Right(r))
+ def error(t: Throwable) = cb(Left(t))
+
+ r.responseAs match {
+ case ras @ ResponseAsStream() =>
+ preparedRequest
+ .execute(streamingAsyncHandler(ras, success, error))
+
+ case ra =>
+ preparedRequest
+ .execute(eagerAsyncHandler(ra, success, error))
+ }
+
})
+
}
- private def requestToAsync(r: Request[_, Nothing]): AsyncRequest = {
+ protected def streamBodyToPublisher(s: S): Publisher[ByteBuffer]
+
+ protected def publisherToStreamBody(p: Publisher[ByteBuffer]): S
+
+ private def eagerAsyncHandler[T](
+ responseAs: ResponseAs[T, S],
+ success: R[Response[T]] => Unit,
+ error: Throwable => Unit): AsyncHandler[Unit] = {
+
+ new AsyncCompletionHandler[Unit] {
+ override def onCompleted(response: AsyncResponse): Unit =
+ success(readEagerResponse(response, responseAs))
+
+ override def onThrowable(t: Throwable): Unit = error(t)
+ }
+ }
+
+ private def streamingAsyncHandler[T](
+ responseAs: ResponseAsStream[T, S],
+ success: R[Response[T]] => Unit,
+ error: Throwable => Unit): AsyncHandler[Unit] = {
+ new StreamedAsyncHandler[Unit] {
+ private val builder = new AsyncResponse.ResponseBuilder()
+ private var publisher: Option[Publisher[ByteBuffer]] = None
+
+ override def onStream(
+ p: Publisher[HttpResponseBodyPart]): AsyncHandler.State = {
+ // Sadly we don't have .map on Publisher
+ publisher = Some(new Publisher[ByteBuffer] {
+ override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit =
+ p.subscribe(new Subscriber[HttpResponseBodyPart] {
+ override def onError(t: Throwable): Unit = s.onError(t)
+ override def onComplete(): Unit = s.onComplete()
+ override def onNext(t: HttpResponseBodyPart): Unit =
+ s.onNext(t.getBodyByteBuffer)
+ override def onSubscribe(v: Subscription): Unit =
+ s.onSubscribe(v)
+ })
+ })
+ State.CONTINUE
+ }
+
+ override def onBodyPartReceived(
+ bodyPart: HttpResponseBodyPart): AsyncHandler.State =
+ throw new IllegalStateException(
+ "Requested a streaming handler, unexpected eager body parts.")
+
+ override def onHeadersReceived(
+ headers: HttpResponseHeaders): AsyncHandler.State = {
+ builder.accumulate(headers)
+ State.CONTINUE
+ }
+
+ override def onStatusReceived(
+ responseStatus: HttpResponseStatus): AsyncHandler.State = {
+ builder.accumulate(responseStatus)
+ State.CONTINUE
+ }
+
+ override def onCompleted(): Unit = {
+ val baseResponse = readResponseNoBody(builder.build())
+ val p = publisher.getOrElse(EmptyPublisher)
+ val s = publisherToStreamBody(p)
+ val t = responseAs.responseIsStream(s)
+ success(rm.unit(baseResponse.copy(body = t)))
+ }
+
+ override def onThrowable(t: Throwable): Unit = {
+ error(t)
+ }
+ }
+ }
+
+ private def requestToAsync(r: Request[_, S]): AsyncRequest = {
val rb = new RequestBuilder(r.method.m).setUrl(r.uri.toString)
r.headers.foreach { case (k, v) => rb.setHeader(k, v) }
- setBody(r.body, rb)
+ setBody(r, r.body, rb)
rb.build()
}
- private def setBody(body: RequestBody[Nothing], rb: RequestBuilder): Unit = {
+ private def setBody(r: Request[_, S],
+ body: RequestBody[S],
+ rb: RequestBuilder): Unit = {
body match {
case NoBody => // skip
@@ -60,36 +157,42 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
rb.setBody(b.toFile)
case SerializableBody(f, t) =>
- setBody(f(t), rb)
+ setBody(r, f(t), rb)
case StreamBody(s) =>
- // we have an instance of nothing - everything's possible!
- s
+ val cl = r.headers
+ .find(_._1.equalsIgnoreCase(ContentLengthHeader))
+ .map(_._2.toLong)
+ .getOrElse(-1L)
+ rb.setBody(streamBodyToPublisher(s), cl)
}
}
- private def readResponse[T](
+ private def readEagerResponse[T](
response: AsyncResponse,
- responseAs: ResponseAs[T, Nothing]): R[Response[T]] = {
- val body = readResponseBody(response, responseAs)
- rm.map(body,
- Response(_: T,
- response.getStatusCode,
- response.getHeaders
- .iterator()
- .asScala
- .map(e => (e.getKey, e.getValue))
- .toList))
+ responseAs: ResponseAs[T, S]): R[Response[T]] = {
+ val body = readEagerResponseBody(response, responseAs)
+ rm.map(body, (b: T) => readResponseNoBody(response).copy(body = b))
}
- private def readResponseBody[T](response: AsyncResponse,
- responseAs: ResponseAs[T, Nothing]): R[T] = {
+ private def readResponseNoBody(response: AsyncResponse): Response[Unit] = {
+ Response((),
+ response.getStatusCode,
+ response.getHeaders
+ .iterator()
+ .asScala
+ .map(e => (e.getKey, e.getValue))
+ .toList)
+ }
+
+ private def readEagerResponseBody[T](response: AsyncResponse,
+ responseAs: ResponseAs[T, S]): R[T] = {
def asString(enc: String) = response.getResponseBody(Charset.forName(enc))
responseAs match {
case MappedResponseAs(raw, g) =>
- rm.map(readResponseBody(response, raw), g)
+ rm.map(readEagerResponseBody(response, raw), g)
case IgnoreResponse =>
// getting the body and discarding it
@@ -108,7 +211,9 @@ class AsyncHttpClientHandler[R[_]](asyncHttpClient: AsyncHttpClient,
case ResponseAsStream() =>
// only possible when the user requests the response as a stream of
// Nothing. Oh well ...
- rm.error(new IllegalStateException())
+ rm.error(
+ new IllegalStateException(
+ "Requested a streaming response, trying to read eagerly."))
}
}
}
@@ -122,3 +227,9 @@ trait MonadAsyncError[R[_]] {
def flatten[T](ffa: R[R[T]]): R[T] = flatMap[R[T], T](ffa, identity)
}
+
+object EmptyPublisher extends Publisher[ByteBuffer] {
+ override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {
+ s.onComplete()
+ }
+}
diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
index a67be2e..5e6db17 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
@@ -1,11 +1,15 @@
package com.softwaremill.sttp
+import java.nio.ByteBuffer
+
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler
+import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler
import com.typesafe.scalalogging.StrictLogging
+import monix.reactive.Observable
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
@@ -32,12 +36,13 @@ class StreamingTests
override def port = 51824
akkaStreamingTests()
+ monixStreamingTests()
+
+ val body = "streaming test"
def akkaStreamingTests(): Unit = {
implicit val handler = AkkaHttpSttpHandler.usingActorSystem(actorSystem)
- val body = "streaming test"
-
"Akka HTTP" should "stream request body" in {
val response = sttp
.post(uri"$endpoint/echo")
@@ -61,4 +66,44 @@ class StreamingTests
responseBody should be(body)
}
}
+
+ def monixStreamingTests(): Unit = {
+ implicit val handler = MonixAsyncHttpClientHandler()
+ import monix.execution.Scheduler.Implicits.global
+
+ val body = "streaming test"
+
+ "Monix Async Http Client" should "stream request body" in {
+ val source = Observable.fromIterable(
+ body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
+
+ val response = sttp
+ .post(uri"$endpoint/echo")
+ .streamBody(source)
+ .send()
+ .runAsync
+ .futureValue
+
+ response.body should be(body)
+ }
+
+ it should "receive a stream" in {
+ val response = sttp
+ .post(uri"$endpoint/echo")
+ .body(body)
+ .response(asStream[Observable[ByteBuffer]])
+ .send()
+ .runAsync
+ .futureValue
+
+ val bytes = response.body
+ .flatMap(bb => Observable.fromIterable(bb.array()))
+ .toListL
+ .runAsync
+ .futureValue
+ .toArray
+
+ new String(bytes, "utf-8") should be(body)
+ }
+ }
}