diff options
6 files changed, 207 insertions, 37 deletions
@@ -190,7 +190,8 @@ uri"$scheme://$subdomains.example.com?x=$vx&$params#$jumpTo" | `MonixAsyncHttpClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` | | `CatsAsyncHttpClientHandler` | `F[_]: cats.effect.Async` | - | | `OkHttpSyncClientHandler` | None (`Id`) | - | -| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - | +| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - | +| `OkHttpMonixClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` | ### `HttpURLConnectionSttpHandler` @@ -32,7 +32,12 @@ val commonSettings = Seq( val akkaHttpVersion = "10.0.9" val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion + +val monixVersion = "2.3.0" +val monix = "io.monix" %% "monix" % monixVersion + val circeVersion = "0.8.0" + val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3" lazy val rootProject = (project in file(".")) @@ -47,6 +52,7 @@ lazy val rootProject = (project in file(".")) monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, + okhttpMonixClientHandler, circe, tests ) @@ -102,9 +108,7 @@ lazy val monixAsyncHttpClientHandler: Project = (project in file( .settings(commonSettings: _*) .settings( name := "async-http-client-handler-monix", - libraryDependencies ++= Seq( - "io.monix" %% "monix" % "2.3.0" - ) + libraryDependencies ++= Seq(monix) ) dependsOn asyncHttpClientHandler lazy val catsAsyncHttpClientHandler: Project = (project in file( @@ -127,6 +131,14 @@ lazy val okhttpClientHandler: Project = (project in file( ) ) dependsOn core +lazy val okhttpMonixClientHandler: Project = (project in file( + "okhttp-client-handler/monix")) + .settings(commonSettings: _*) + .settings( + name := "okhttp-client-handler-monix", + libraryDependencies ++= Seq(monix) + ) dependsOn okhttpClientHandler + lazy val circe: Project = (project in file("circe")) .settings(commonSettings: _*) .settings( @@ -152,4 +164,4 @@ lazy val tests: Project = (project in file("tests")) ).map(_ % "test"), libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test" ) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler, -monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler) +monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler) diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala new file mode 100644 index 0000000..e792c44 --- /dev/null +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -0,0 +1,79 @@ +package com.softwaremill.sttp.okhttp.monix + +import java.nio.ByteBuffer + +import com.softwaremill.sttp._ +import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler +import monix.eval.Task +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.{Consumer, Observable} +import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} +import okio.BufferedSink + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.language.higherKinds +import scala.util.{Failure, Success, Try} + +/** + * Created by omainegra on 8/4/17. + */ +class OkHttpMonixClientHandler private (client: OkHttpClient)( + implicit s: Scheduler) + extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client, + TaskMonad) { + + private lazy val io = Scheduler.io("sttp-monix-io") + + override def streamToRequestBody( + stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = + Some(new OkHttpRequestBody() { + override def writeTo(sink: BufferedSink): Unit = { + val f = stream + .consumeWith( + Consumer.foreach(chunk => sink.write(chunk.array())) + ) + .runAsync(io) + + // We could safely block until the observable is consumed because OkHttp execute + // this method asynchronous in another ThreadPool. + Await.ready(f, Duration.Inf) + } + + override def contentType(): MediaType = null + }) + + override def responseBodyToStream( + res: okhttp3.Response): Try[Observable[ByteBuffer]] = + Success( + Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap)) +} + +object OkHttpMonixClientHandler { + def apply(okhttpClient: OkHttpClient = new OkHttpClient())( + implicit s: Scheduler = Scheduler.Implicits.global) + : OkHttpMonixClientHandler = + new OkHttpMonixClientHandler(okhttpClient)(s) +} + +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) +} diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala index f7c4466..8f21eb0 100644 --- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala +++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala @@ -19,7 +19,7 @@ import okhttp3.{ import okio.{BufferedSink, Okio} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds import scala.util.{Failure, Try} @@ -61,13 +61,14 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) override def contentType(): MediaType = null }) case PathBody(b, _) => Some(OkHttpRequestBody.create(null, b.toFile)) - case StreamBody(s) => None + case StreamBody(s) => streamToRequestBody(s) } } private[okhttp] def readResponse[T]( res: OkHttpResponse, responseAs: ResponseAs[T, S]): R[Response[T]] = { + val body = responseHandler(res).handle(responseAs, responseMonad) val headers = res @@ -87,12 +88,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) case ResponseAsString(encoding) => Try(res.body().source().readString(Charset.forName(encoding))) case ResponseAsByteArray => Try(res.body().bytes()) - case ResponseAsStream() => - Failure(new IllegalStateException("Streaming isn't supported")) + case ras @ ResponseAsStream() => + responseBodyToStream(res).map(ras.responseIsStream) case ResponseAsFile(file, overwrite) => Try(ResponseAs.saveFile(file, res.body().byteStream(), overwrite)) } } + + def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None + + def responseBodyToStream(res: OkHttpResponse): Try[S] = + Failure(new IllegalStateException("Streaming isn't supported")) } class OkHttpSyncClientHandler private (client: OkHttpClient) @@ -108,38 +114,43 @@ class OkHttpSyncClientHandler private (client: OkHttpClient) object OkHttpSyncClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient()) - : SttpHandler[Id, Nothing] = + : OkHttpSyncClientHandler = new OkHttpSyncClientHandler(okhttpClient) } -class OkHttpFutureClientHandler private (client: OkHttpClient)( - implicit ec: ExecutionContext) - extends OkHttpClientHandler[Future, Nothing](client) { - - override def send[T](r: Request[T, Nothing]): Future[Response[T]] = { +abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient, + rm: MonadAsyncError[R]) + extends OkHttpClientHandler[R, S](client) { + override def send[T](r: Request[T, S]): R[Response[T]] = { val request = convertRequest(r) - val promise = Promise[Future[Response[T]]]() - - client - .newCall(request) - .enqueue(new Callback { - override def onFailure(call: Call, e: IOException): Unit = - promise.failure(e) - override def onResponse(call: Call, response: OkHttpResponse): Unit = - try promise.success(readResponse(response, r.responseAs)) - catch { case e: Exception => promise.failure(e) } - }) - - responseMonad.flatten(promise.future) + rm.flatten(rm.async[R[Response[T]]] { cb => + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) + + client + .newCall(request) + .enqueue(new Callback { + override def onFailure(call: Call, e: IOException): Unit = + error(e) + + override def onResponse(call: Call, response: OkHttpResponse): Unit = + try success(readResponse(response, r.responseAs)) + catch { case e: Exception => error(e) } + }) + }) } - override def responseMonad: MonadError[Future] = new FutureMonad + override def responseMonad: MonadError[R] = rm } +class OkHttpFutureClientHandler private (client: OkHttpClient)( + implicit ec: ExecutionContext) + extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {} + object OkHttpFutureClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient())( implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[Future, Nothing] = + : OkHttpFutureClientHandler = new OkHttpFutureClientHandler(okhttpClient) } diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala index 1c0d3cd..9b64271 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala @@ -21,6 +21,7 @@ import com.softwaremill.sttp.asynchttpclient.cats.CatsAsyncHttpClientHandler import com.softwaremill.sttp.asynchttpclient.future.FutureAsyncHttpClientHandler import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler import com.softwaremill.sttp.asynchttpclient.scalaz.ScalazAsyncHttpClientHandler +import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler import com.softwaremill.sttp.okhttp.{ OkHttpFutureClientHandler, OkHttpSyncClientHandler @@ -156,8 +157,10 @@ class BasicTests ForceWrappedValue.catsIo) runTests("OkHttpSyncClientHandler")(OkHttpSyncClientHandler(), ForceWrappedValue.id) - runTests("OkHttpSyncClientHandler - Future")(OkHttpFutureClientHandler(), - ForceWrappedValue.future) + runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureClientHandler(), + ForceWrappedValue.future) + runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixClientHandler(), + ForceWrappedValue.monixTask) def runTests[R[_]](name: String)( implicit handler: SttpHandler[R, Nothing], diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala index 5f04126..5820153 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala @@ -8,6 +8,7 @@ import akka.stream.scaladsl.Source import akka.util.ByteString import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler +import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler import com.typesafe.scalalogging.StrictLogging import monix.reactive.Observable import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} @@ -36,10 +37,12 @@ class StreamingTests override def port = 51824 val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem) - val monixHandler = MonixAsyncHttpClientHandler() + val monixAsyncHttpClient = MonixAsyncHttpClientHandler() + val monixOkHttpClient = OkHttpMonixClientHandler() akkaStreamingTests() - monixStreamingTests() + monixAsyncHttpClientStreamingTests() + monixOkHttpClientStreamingTests() val body = "streaming test" @@ -70,8 +73,8 @@ class StreamingTests } } - def monixStreamingTests(): Unit = { - implicit val handler = monixHandler + def monixOkHttpClientStreamingTests(): Unit = { + implicit val handler = monixOkHttpClient import monix.execution.Scheduler.Implicits.global val body = "streaming test" @@ -131,9 +134,70 @@ class StreamingTests } } + def monixAsyncHttpClientStreamingTests(): Unit = { + implicit val handler = monixAsyncHttpClient + import monix.execution.Scheduler.Implicits.global + + val body = "streaming test" + + "Monix OkHttp Client" should "stream request body" in { + val source = Observable.fromIterable( + body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) + + val response = sttp + .post(uri"$endpoint/echo") + .streamBody(source) + .send() + .runAsync + .futureValue + + response.body should be(body) + } + + it should "receive a stream" in { + val response = sttp + .post(uri"$endpoint/echo") + .body(body) + .response(asStream[Observable[ByteBuffer]]) + .send() + .runAsync + .futureValue + + val bytes = response.body + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .runAsync + .futureValue + .toArray + + new String(bytes, "utf-8") should be(body) + } + + it should "receive a stream from an https site" in { + val response = sttp + // of course, you should never rely on the internet being available + // in tests, but that's so much easier than setting up an https + // testing server + .get(uri"https://softwaremill.com") + .response(asStream[Observable[ByteBuffer]]) + .send() + .runAsync + .futureValue + + val bytes = response.body + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .runAsync + .futureValue + .toArray + + new String(bytes, "utf-8") should include("</div>") + } + } + override protected def afterAll(): Unit = { akkaHandler.close() - monixHandler.close() + monixAsyncHttpClient.close() super.afterAll() } } |