diff options
author | Adam Warski <adam@warski.org> | 2017-08-28 16:45:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-28 16:45:33 +0200 |
commit | f1b4a997b0ccda703971a3baa0eff007097cb0da (patch) | |
tree | a3c8b50bf3c613ae0dade6b9778b3e6b393d7846 | |
parent | 589820beefd742e8b01f337a49e060ef9b2be3ea (diff) | |
parent | 8733320e18faa4d2d2d66a96de503469fc236e46 (diff) | |
download | sttp-f1b4a997b0ccda703971a3baa0eff007097cb0da.tar.gz sttp-f1b4a997b0ccda703971a3baa0eff007097cb0da.tar.bz2 sttp-f1b4a997b0ccda703971a3baa0eff007097cb0da.zip |
Merge pull request #24 from omainegra/okhttp3-monix
Okhttp3 monix
6 files changed, 231 insertions, 107 deletions
@@ -190,7 +190,8 @@ uri"$scheme://$subdomains.example.com?x=$vx&$params#$jumpTo" | `MonixAsyncHttpClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` | | `CatsAsyncHttpClientHandler` | `F[_]: cats.effect.Async` | - | | `OkHttpSyncClientHandler` | None (`Id`) | - | -| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - | +| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - | +| `OkHttpMonixClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` | ### `HttpURLConnectionSttpHandler` @@ -32,7 +32,12 @@ val commonSettings = Seq( val akkaHttpVersion = "10.0.9" val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion + +val monixVersion = "2.3.0" +val monix = "io.monix" %% "monix" % monixVersion + val circeVersion = "0.8.0" + val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3" lazy val rootProject = (project in file(".")) @@ -47,6 +52,7 @@ lazy val rootProject = (project in file(".")) monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, + okhttpMonixClientHandler, circe, tests ) @@ -102,9 +108,7 @@ lazy val monixAsyncHttpClientHandler: Project = (project in file( .settings(commonSettings: _*) .settings( name := "async-http-client-handler-monix", - libraryDependencies ++= Seq( - "io.monix" %% "monix" % "2.3.0" - ) + libraryDependencies ++= Seq(monix) ) dependsOn asyncHttpClientHandler lazy val catsAsyncHttpClientHandler: Project = (project in file( @@ -127,6 +131,14 @@ lazy val okhttpClientHandler: Project = (project in file( ) ) dependsOn core +lazy val okhttpMonixClientHandler: Project = (project in file( + "okhttp-client-handler/monix")) + .settings(commonSettings: _*) + .settings( + name := "okhttp-client-handler-monix", + libraryDependencies ++= Seq(monix) + ) dependsOn okhttpClientHandler + lazy val circe: Project = (project in file("circe")) .settings(commonSettings: _*) .settings( @@ -152,4 +164,4 @@ lazy val tests: Project = (project in file("tests")) ).map(_ % "test"), libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test" ) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler, -monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler) +monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler) diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala new file mode 100644 index 0000000..1401dee --- /dev/null +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -0,0 +1,108 @@ +package com.softwaremill.sttp.okhttp.monix + +import java.nio.ByteBuffer +import java.util.concurrent.ArrayBlockingQueue + +import com.softwaremill.sttp.{SttpHandler, _} +import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler +import monix.eval.Task +import monix.execution.Ack.Continue +import monix.execution.{Ack, Cancelable, Scheduler} +import monix.reactive.Observable +import monix.reactive.observers.Subscriber +import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} +import okio.BufferedSink + +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} + +/** + * Created by omainegra on 8/4/17. + */ +class OkHttpMonixClientHandler private (client: OkHttpClient)( + implicit s: Scheduler) + extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client, + TaskMonad) { + + override def streamToRequestBody( + stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = + Some(new OkHttpRequestBody() { + override def writeTo(sink: BufferedSink): Unit = + toIterable(stream) map (_.array()) foreach sink.write + override def contentType(): MediaType = null + }) + + override def responseBodyToStream( + res: okhttp3.Response): Try[Observable[ByteBuffer]] = + Success( + Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap)) + + private def toIterable[T](observable: Observable[T])( + implicit s: Scheduler): Iterable[T] = + new Iterable[T] { + override def iterator: Iterator[T] = new Iterator[T] { + case object Completed extends Exception + + val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1) + + observable.executeWithFork.subscribe(new Subscriber[T] { + override implicit def scheduler: Scheduler = s + + override def onError(ex: Throwable): Unit = { + blockingQueue.put(Left(ex)) + } + + override def onComplete(): Unit = { + blockingQueue.put(Left(Completed)) + } + + override def onNext(elem: T): Future[Ack] = { + blockingQueue.put(Right(elem)) + Continue + } + }) + + var value: T = _ + + override def hasNext: Boolean = + blockingQueue.take() match { + case Left(Completed) => false + case Right(elem) => + value = elem + true + case Left(ex) => throw ex + } + + override def next(): T = value + } + } +} + +object OkHttpMonixClientHandler { + def apply(okhttpClient: OkHttpClient = new OkHttpClient())( + implicit s: Scheduler = Scheduler.Implicits.global) + : SttpHandler[Task, Observable[ByteBuffer]] = + new OkHttpMonixClientHandler(okhttpClient)(s) +} + +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) +} diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala index f7c4466..00ea10c 100644 --- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala +++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala @@ -19,7 +19,7 @@ import okhttp3.{ import okio.{BufferedSink, Okio} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds import scala.util.{Failure, Try} @@ -29,7 +29,7 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) val builder = new OkHttpRequest.Builder() .url(request.uri.toString) - val body = setBody(request.body) + val body = setBody(request) builder.method(request.method.m, body.getOrElse { if (HttpMethod.requiresRequestBody(request.method.m)) OkHttpRequestBody.create(null, "") @@ -46,12 +46,13 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) builder.build() } - private def setBody(requestBody: RequestBody[S]): Option[OkHttpRequestBody] = { - requestBody match { + private def setBody[T](request: Request[T, S]): Option[OkHttpRequestBody] = { + request.body match { case NoBody => None - case StringBody(b, encoding, _) => - Some(OkHttpRequestBody.create(MediaType.parse(encoding), b)) - case ByteArrayBody(b, _) => Some(OkHttpRequestBody.create(null, b)) + case StringBody(b, _, _) => + Some(OkHttpRequestBody.create(null, b)) + case ByteArrayBody(b, _) => + Some(OkHttpRequestBody.create(null, b)) case ByteBufferBody(b, _) => Some(OkHttpRequestBody.create(null, b.array())) case InputStreamBody(b, _) => @@ -60,14 +61,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) sink.writeAll(Okio.source(b)) override def contentType(): MediaType = null }) - case PathBody(b, _) => Some(OkHttpRequestBody.create(null, b.toFile)) - case StreamBody(s) => None + case PathBody(b, _) => + Some(OkHttpRequestBody.create(null, b.toFile)) + case StreamBody(s) => + streamToRequestBody(s) } } private[okhttp] def readResponse[T]( res: OkHttpResponse, responseAs: ResponseAs[T, S]): R[Response[T]] = { + val body = responseHandler(res).handle(responseAs, responseMonad) val headers = res @@ -87,12 +91,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) case ResponseAsString(encoding) => Try(res.body().source().readString(Charset.forName(encoding))) case ResponseAsByteArray => Try(res.body().bytes()) - case ResponseAsStream() => - Failure(new IllegalStateException("Streaming isn't supported")) + case ras @ ResponseAsStream() => + responseBodyToStream(res).map(ras.responseIsStream) case ResponseAsFile(file, overwrite) => Try(ResponseAs.saveFile(file, res.body().byteStream(), overwrite)) } } + + def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None + + def responseBodyToStream(res: OkHttpResponse): Try[S] = + Failure(new IllegalStateException("Streaming isn't supported")) } class OkHttpSyncClientHandler private (client: OkHttpClient) @@ -108,35 +117,40 @@ class OkHttpSyncClientHandler private (client: OkHttpClient) object OkHttpSyncClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient()) - : SttpHandler[Id, Nothing] = + : OkHttpSyncClientHandler = new OkHttpSyncClientHandler(okhttpClient) } -class OkHttpFutureClientHandler private (client: OkHttpClient)( - implicit ec: ExecutionContext) - extends OkHttpClientHandler[Future, Nothing](client) { - - override def send[T](r: Request[T, Nothing]): Future[Response[T]] = { +abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient, + rm: MonadAsyncError[R]) + extends OkHttpClientHandler[R, S](client) { + override def send[T](r: Request[T, S]): R[Response[T]] = { val request = convertRequest(r) - val promise = Promise[Future[Response[T]]]() - client - .newCall(request) - .enqueue(new Callback { - override def onFailure(call: Call, e: IOException): Unit = - promise.failure(e) + rm.flatten(rm.async[R[Response[T]]] { cb => + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) - override def onResponse(call: Call, response: OkHttpResponse): Unit = - try promise.success(readResponse(response, r.responseAs)) - catch { case e: Exception => promise.failure(e) } - }) + client + .newCall(request) + .enqueue(new Callback { + override def onFailure(call: Call, e: IOException): Unit = + error(e) - responseMonad.flatten(promise.future) + override def onResponse(call: Call, response: OkHttpResponse): Unit = + try success(readResponse(response, r.responseAs)) + catch { case e: Exception => error(e) } + }) + }) } - override def responseMonad: MonadError[Future] = new FutureMonad + override def responseMonad: MonadError[R] = rm } +class OkHttpFutureClientHandler private (client: OkHttpClient)( + implicit ec: ExecutionContext) + extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {} + object OkHttpFutureClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient())( implicit ec: ExecutionContext = ExecutionContext.Implicits.global) diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala index 1c0d3cd..9b64271 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala @@ -21,6 +21,7 @@ import com.softwaremill.sttp.asynchttpclient.cats.CatsAsyncHttpClientHandler import com.softwaremill.sttp.asynchttpclient.future.FutureAsyncHttpClientHandler import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler import com.softwaremill.sttp.asynchttpclient.scalaz.ScalazAsyncHttpClientHandler +import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler import com.softwaremill.sttp.okhttp.{ OkHttpFutureClientHandler, OkHttpSyncClientHandler @@ -156,8 +157,10 @@ class BasicTests ForceWrappedValue.catsIo) runTests("OkHttpSyncClientHandler")(OkHttpSyncClientHandler(), ForceWrappedValue.id) - runTests("OkHttpSyncClientHandler - Future")(OkHttpFutureClientHandler(), - ForceWrappedValue.future) + runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureClientHandler(), + ForceWrappedValue.future) + runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixClientHandler(), + ForceWrappedValue.monixTask) def runTests[R[_]](name: String)( implicit handler: SttpHandler[R, Nothing], diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala index 5f04126..3238c3c 100644 --- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala +++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala @@ -8,10 +8,14 @@ import akka.stream.scaladsl.Source import akka.util.ByteString import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler +import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler import com.typesafe.scalalogging.StrictLogging +import monix.execution.Scheduler.Implicits.global import monix.reactive.Observable -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.language.higherKinds class StreamingTests extends FlatSpec @@ -20,6 +24,7 @@ class StreamingTests with ScalaFutures with StrictLogging with IntegrationPatience + with ForceWrapped with TestHttpServer { override val serverRoutes: Route = @@ -33,59 +38,56 @@ class StreamingTests } } - override def port = 51824 - - val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem) - val monixHandler = MonixAsyncHttpClientHandler() - - akkaStreamingTests() - monixStreamingTests() + type BodyProducer[S] = String => S + type BodyConsumer[S] = S => String + override def port = 51824 val body = "streaming test" - def akkaStreamingTests(): Unit = { - implicit val handler = akkaHandler - - "Akka HTTP" should "stream request body" in { - val response = sttp - .post(uri"$endpoint/echo") - .streamBody(Source.single(ByteString(body))) - .send() - .futureValue - - response.body should be(body) - } - - it should "receive a stream" in { - val response = sttp - .post(uri"$endpoint/echo") - .body(body) - .response(asStream[Source[ByteString, Any]]) - .send() - .futureValue - - val responseBody = response.body.runReduce(_ ++ _).futureValue.utf8String - - responseBody should be(body) - } - } - - def monixStreamingTests(): Unit = { - implicit val handler = monixHandler - import monix.execution.Scheduler.Implicits.global - - val body = "streaming test" - - "Monix Async Http Client" should "stream request body" in { - val source = Observable.fromIterable( - body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) - + val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem) + val monixAsyncHttpClient = MonixAsyncHttpClientHandler() + val monixOkHttpClient = OkHttpMonixClientHandler() + + val akkaHttpBodyProducer: BodyProducer[Source[ByteString, Any]] = s => + Source.single(ByteString(s)) + val akkaHttpBodyConsumer: BodyConsumer[Source[ByteString, Any]] = + _.runReduce(_ ++ _).futureValue.utf8String + + val monixBodyProducer: BodyProducer[Observable[ByteBuffer]] = + s => + Observable.fromIterable( + s.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b)))) + + val monixBodyConsumer: BodyConsumer[Observable[ByteBuffer]] = stream => + new String(stream + .flatMap(bb => Observable.fromIterable(bb.array())) + .toListL + .runAsync + .futureValue + .toArray, + "utf-8") + + runTests("Akka HTTP", akkaHttpBodyProducer, akkaHttpBodyConsumer)( + akkaHandler, + ForceWrappedValue.future) + runTests("Monix Async Http Client", monixBodyProducer, monixBodyConsumer)( + monixAsyncHttpClient, + ForceWrappedValue.monixTask) + runTests("Monix OkHttp Client", monixBodyProducer, monixBodyConsumer)( + monixOkHttpClient, + ForceWrappedValue.monixTask) + + def runTests[R[_], S](name: String, + bodyProducer: BodyProducer[S], + bodyConsumer: BodyConsumer[S])( + implicit handler: SttpHandler[R, S], + forceResponse: ForceWrappedValue[R]): Unit = { + name should "stream request body" in { val response = sttp .post(uri"$endpoint/echo") - .streamBody(source) + .streamBody(bodyProducer(body)) .send() - .runAsync - .futureValue + .force() response.body should be(body) } @@ -94,19 +96,11 @@ class StreamingTests val response = sttp .post(uri"$endpoint/echo") .body(body) - .response(asStream[Observable[ByteBuffer]]) + .response(asStream[S]) .send() - .runAsync - .futureValue + .force() - val bytes = response.body - .flatMap(bb => Observable.fromIterable(bb.array())) - .toListL - .runAsync - .futureValue - .toArray - - new String(bytes, "utf-8") should be(body) + bodyConsumer(response.body) should be(body) } it should "receive a stream from an https site" in { @@ -115,25 +109,17 @@ class StreamingTests // in tests, but that's so much easier than setting up an https // testing server .get(uri"https://softwaremill.com") - .response(asStream[Observable[ByteBuffer]]) + .response(asStream[S]) .send() - .runAsync - .futureValue - - val bytes = response.body - .flatMap(bb => Observable.fromIterable(bb.array())) - .toListL - .runAsync - .futureValue - .toArray + .force() - new String(bytes, "utf-8") should include("</div>") + bodyConsumer(response.body) should include("</div>") } } override protected def afterAll(): Unit = { akkaHandler.close() - monixHandler.close() + monixAsyncHttpClient.close() super.afterAll() } } |