aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmar Alejandro Mainegra Sarduy <omainegra@gmail.com>2017-08-04 18:14:28 -0400
committerOmar Alejandro Mainegra Sarduy <omainegra@gmail.com>2017-08-04 18:14:28 -0400
commitb217185d8bd4297aef1e91b34f295620a1832e0b (patch)
tree571c08d3fb7e3cd328b0b91f60a981eea9ab203c
parent1a9b0b61e18c9b57fa52cebeb80fb1caddf58186 (diff)
downloadsttp-b217185d8bd4297aef1e91b34f295620a1832e0b.tar.gz
sttp-b217185d8bd4297aef1e91b34f295620a1832e0b.tar.bz2
sttp-b217185d8bd4297aef1e91b34f295620a1832e0b.zip
Add new Monix handler
-rw-r--r--build.sbt18
-rw-r--r--okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala72
2 files changed, 86 insertions, 4 deletions
diff --git a/build.sbt b/build.sbt
index 332e80b..8e105a0 100644
--- a/build.sbt
+++ b/build.sbt
@@ -33,6 +33,9 @@ val commonSettings = Seq(
val akkaHttpVersion = "10.0.9"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
+val monixVersion = "2.3.0"
+val monix = "io.monix" %% "monix" % monixVersion
+
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3"
lazy val rootProject = (project in file("."))
@@ -47,6 +50,7 @@ lazy val rootProject = (project in file("."))
monixAsyncHttpClientHandler,
catsAsyncHttpClientHandler,
okhttpClientHandler,
+ okhttpMonixClientHandler,
tests
)
@@ -101,9 +105,7 @@ lazy val monixAsyncHttpClientHandler: Project = (project in file(
.settings(commonSettings: _*)
.settings(
name := "async-http-client-handler-monix",
- libraryDependencies ++= Seq(
- "io.monix" %% "monix" % "2.3.0"
- )
+ libraryDependencies ++= Seq(monix)
) dependsOn asyncHttpClientHandler
lazy val catsAsyncHttpClientHandler: Project = (project in file(
@@ -126,6 +128,14 @@ lazy val okhttpClientHandler: Project = (project in file(
)
) dependsOn core
+lazy val okhttpMonixClientHandler: Project = (project in file(
+ "okhttp-client-handler/monix"))
+ .settings(commonSettings: _*)
+ .settings(
+ name := "okhttp-client-handler-monix",
+ libraryDependencies ++= Seq(monix)
+ ) dependsOn okhttpClientHandler
+
lazy val tests: Project = (project in file("tests"))
.settings(commonSettings: _*)
.settings(
@@ -140,4 +150,4 @@ lazy val tests: Project = (project in file("tests"))
).map(_ % "test"),
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test"
) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler,
-monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler)
+monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler)
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
new file mode 100644
index 0000000..ea4af14
--- /dev/null
+++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
@@ -0,0 +1,72 @@
+package com.softwaremill.sttp.okhttp.monix
+
+import java.nio.ByteBuffer
+
+import com.softwaremill.sttp._
+import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler
+import monix.eval.{Callback, Task}
+import monix.execution.cancelables.AssignableCancelable
+import monix.execution.{Cancelable, Scheduler}
+import monix.reactive.observers.Subscriber
+import monix.reactive.{Consumer, Observable}
+import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
+import okio.{BufferedSink, Okio}
+
+import scala.language.higherKinds
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Created by omainegra on 8/4/17.
+ */
+class OkHttpMonixClientHandler private (client: OkHttpClient)(
+ implicit s: Scheduler)
+ extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client,
+ TaskMonad) {
+
+ override def streamToRequestBody(
+ stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
+ Some(new OkHttpRequestBody() {
+ override def writeTo(sink: BufferedSink): Unit =
+ stream
+ .consumeWith(
+ Consumer.foreach(chunck => sink.write(chunck.array()))
+ )
+ .runAsync
+
+ override def contentType(): MediaType = null
+ })
+
+ override def responseBodyToStream(
+ res: okhttp3.Response): Try[Observable[ByteBuffer]] =
+ Success(
+ Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap))
+}
+
+object OkHttpMonixClientHandler {
+ def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
+ implicit s: Scheduler = Scheduler.Implicits.global)
+ : OkHttpMonixClientHandler =
+ new OkHttpMonixClientHandler(okhttpClient)(s)
+}
+
+private[monix] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { (_, cb) =>
+ register {
+ case Left(t) => cb(Failure(t))
+ case Right(t) => cb(Success(t))
+ }
+
+ Cancelable.empty
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
+}