From b217185d8bd4297aef1e91b34f295620a1832e0b Mon Sep 17 00:00:00 2001 From: Omar Alejandro Mainegra Sarduy Date: Fri, 4 Aug 2017 18:14:28 -0400 Subject: Add new Monix handler --- build.sbt | 18 ++++-- .../okhttp/monix/OkHttpMonixClientHandler.scala | 72 ++++++++++++++++++++++ 2 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala diff --git a/build.sbt b/build.sbt index 332e80b..8e105a0 100644 --- a/build.sbt +++ b/build.sbt @@ -33,6 +33,9 @@ val commonSettings = Seq( val akkaHttpVersion = "10.0.9" val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion +val monixVersion = "2.3.0" +val monix = "io.monix" %% "monix" % monixVersion + val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3" lazy val rootProject = (project in file(".")) @@ -47,6 +50,7 @@ lazy val rootProject = (project in file(".")) monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, + okhttpMonixClientHandler, tests ) @@ -101,9 +105,7 @@ lazy val monixAsyncHttpClientHandler: Project = (project in file( .settings(commonSettings: _*) .settings( name := "async-http-client-handler-monix", - libraryDependencies ++= Seq( - "io.monix" %% "monix" % "2.3.0" - ) + libraryDependencies ++= Seq(monix) ) dependsOn asyncHttpClientHandler lazy val catsAsyncHttpClientHandler: Project = (project in file( @@ -126,6 +128,14 @@ lazy val okhttpClientHandler: Project = (project in file( ) ) dependsOn core +lazy val okhttpMonixClientHandler: Project = (project in file( + "okhttp-client-handler/monix")) + .settings(commonSettings: _*) + .settings( + name := "okhttp-client-handler-monix", + libraryDependencies ++= Seq(monix) + ) dependsOn okhttpClientHandler + lazy val tests: Project = (project in file("tests")) .settings(commonSettings: _*) .settings( @@ -140,4 +150,4 @@ lazy val tests: Project = (project in file("tests")) ).map(_ % "test"), libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test" ) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler, -monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler) +monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler) diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala new file mode 100644 index 0000000..ea4af14 --- /dev/null +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -0,0 +1,72 @@ +package com.softwaremill.sttp.okhttp.monix + +import java.nio.ByteBuffer + +import com.softwaremill.sttp._ +import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler +import monix.eval.{Callback, Task} +import monix.execution.cancelables.AssignableCancelable +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.observers.Subscriber +import monix.reactive.{Consumer, Observable} +import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} +import okio.{BufferedSink, Okio} + +import scala.language.higherKinds +import scala.util.{Failure, Success, Try} + +/** + * Created by omainegra on 8/4/17. + */ +class OkHttpMonixClientHandler private (client: OkHttpClient)( + implicit s: Scheduler) + extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client, + TaskMonad) { + + override def streamToRequestBody( + stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = + Some(new OkHttpRequestBody() { + override def writeTo(sink: BufferedSink): Unit = + stream + .consumeWith( + Consumer.foreach(chunck => sink.write(chunck.array())) + ) + .runAsync + + override def contentType(): MediaType = null + }) + + override def responseBodyToStream( + res: okhttp3.Response): Try[Observable[ByteBuffer]] = + Success( + Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap)) +} + +object OkHttpMonixClientHandler { + def apply(okhttpClient: OkHttpClient = new OkHttpClient())( + implicit s: Scheduler = Scheduler.Implicits.global) + : OkHttpMonixClientHandler = + new OkHttpMonixClientHandler(okhttpClient)(s) +} + +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) +} -- cgit v1.2.3