aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md3
-rw-r--r--build.sbt20
-rw-r--r--okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala79
-rw-r--r--okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala61
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala7
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala74
6 files changed, 207 insertions, 37 deletions
diff --git a/README.md b/README.md
index 18688a6..b5a3179 100644
--- a/README.md
+++ b/README.md
@@ -190,7 +190,8 @@ uri"$scheme://$subdomains.example.com?x=$vx&$params#$jumpTo"
| `MonixAsyncHttpClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` |
| `CatsAsyncHttpClientHandler` | `F[_]: cats.effect.Async` | - |
| `OkHttpSyncClientHandler` | None (`Id`) | - |
-| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - |
+| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - |
+| `OkHttpMonixClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` |
### `HttpURLConnectionSttpHandler`
diff --git a/build.sbt b/build.sbt
index a68dd25..588df88 100644
--- a/build.sbt
+++ b/build.sbt
@@ -32,7 +32,12 @@ val commonSettings = Seq(
val akkaHttpVersion = "10.0.9"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
+
+val monixVersion = "2.3.0"
+val monix = "io.monix" %% "monix" % monixVersion
+
val circeVersion = "0.8.0"
+
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3"
lazy val rootProject = (project in file("."))
@@ -47,6 +52,7 @@ lazy val rootProject = (project in file("."))
monixAsyncHttpClientHandler,
catsAsyncHttpClientHandler,
okhttpClientHandler,
+ okhttpMonixClientHandler,
circe,
tests
)
@@ -102,9 +108,7 @@ lazy val monixAsyncHttpClientHandler: Project = (project in file(
.settings(commonSettings: _*)
.settings(
name := "async-http-client-handler-monix",
- libraryDependencies ++= Seq(
- "io.monix" %% "monix" % "2.3.0"
- )
+ libraryDependencies ++= Seq(monix)
) dependsOn asyncHttpClientHandler
lazy val catsAsyncHttpClientHandler: Project = (project in file(
@@ -127,6 +131,14 @@ lazy val okhttpClientHandler: Project = (project in file(
)
) dependsOn core
+lazy val okhttpMonixClientHandler: Project = (project in file(
+ "okhttp-client-handler/monix"))
+ .settings(commonSettings: _*)
+ .settings(
+ name := "okhttp-client-handler-monix",
+ libraryDependencies ++= Seq(monix)
+ ) dependsOn okhttpClientHandler
+
lazy val circe: Project = (project in file("circe"))
.settings(commonSettings: _*)
.settings(
@@ -152,4 +164,4 @@ lazy val tests: Project = (project in file("tests"))
).map(_ % "test"),
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test"
) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler,
-monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler)
+monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler)
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
new file mode 100644
index 0000000..e792c44
--- /dev/null
+++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
@@ -0,0 +1,79 @@
+package com.softwaremill.sttp.okhttp.monix
+
+import java.nio.ByteBuffer
+
+import com.softwaremill.sttp._
+import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler
+import monix.eval.Task
+import monix.execution.{Cancelable, Scheduler}
+import monix.reactive.{Consumer, Observable}
+import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
+import okio.BufferedSink
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.language.higherKinds
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Created by omainegra on 8/4/17.
+ */
+class OkHttpMonixClientHandler private (client: OkHttpClient)(
+ implicit s: Scheduler)
+ extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client,
+ TaskMonad) {
+
+ private lazy val io = Scheduler.io("sttp-monix-io")
+
+ override def streamToRequestBody(
+ stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
+ Some(new OkHttpRequestBody() {
+ override def writeTo(sink: BufferedSink): Unit = {
+ val f = stream
+ .consumeWith(
+ Consumer.foreach(chunk => sink.write(chunk.array()))
+ )
+ .runAsync(io)
+
+ // We could safely block until the observable is consumed because OkHttp execute
+ // this method asynchronous in another ThreadPool.
+ Await.ready(f, Duration.Inf)
+ }
+
+ override def contentType(): MediaType = null
+ })
+
+ override def responseBodyToStream(
+ res: okhttp3.Response): Try[Observable[ByteBuffer]] =
+ Success(
+ Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap))
+}
+
+object OkHttpMonixClientHandler {
+ def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
+ implicit s: Scheduler = Scheduler.Implicits.global)
+ : OkHttpMonixClientHandler =
+ new OkHttpMonixClientHandler(okhttpClient)(s)
+}
+
+private[monix] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { (_, cb) =>
+ register {
+ case Left(t) => cb(Failure(t))
+ case Right(t) => cb(Success(t))
+ }
+
+ Cancelable.empty
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
+}
diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
index f7c4466..8f21eb0 100644
--- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
+++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
@@ -19,7 +19,7 @@ import okhttp3.{
import okio.{BufferedSink, Okio}
import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
import scala.util.{Failure, Try}
@@ -61,13 +61,14 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
override def contentType(): MediaType = null
})
case PathBody(b, _) => Some(OkHttpRequestBody.create(null, b.toFile))
- case StreamBody(s) => None
+ case StreamBody(s) => streamToRequestBody(s)
}
}
private[okhttp] def readResponse[T](
res: OkHttpResponse,
responseAs: ResponseAs[T, S]): R[Response[T]] = {
+
val body = responseHandler(res).handle(responseAs, responseMonad)
val headers = res
@@ -87,12 +88,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
case ResponseAsString(encoding) =>
Try(res.body().source().readString(Charset.forName(encoding)))
case ResponseAsByteArray => Try(res.body().bytes())
- case ResponseAsStream() =>
- Failure(new IllegalStateException("Streaming isn't supported"))
+ case ras @ ResponseAsStream() =>
+ responseBodyToStream(res).map(ras.responseIsStream)
case ResponseAsFile(file, overwrite) =>
Try(ResponseAs.saveFile(file, res.body().byteStream(), overwrite))
}
}
+
+ def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None
+
+ def responseBodyToStream(res: OkHttpResponse): Try[S] =
+ Failure(new IllegalStateException("Streaming isn't supported"))
}
class OkHttpSyncClientHandler private (client: OkHttpClient)
@@ -108,38 +114,43 @@ class OkHttpSyncClientHandler private (client: OkHttpClient)
object OkHttpSyncClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())
- : SttpHandler[Id, Nothing] =
+ : OkHttpSyncClientHandler =
new OkHttpSyncClientHandler(okhttpClient)
}
-class OkHttpFutureClientHandler private (client: OkHttpClient)(
- implicit ec: ExecutionContext)
- extends OkHttpClientHandler[Future, Nothing](client) {
-
- override def send[T](r: Request[T, Nothing]): Future[Response[T]] = {
+abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient,
+ rm: MonadAsyncError[R])
+ extends OkHttpClientHandler[R, S](client) {
+ override def send[T](r: Request[T, S]): R[Response[T]] = {
val request = convertRequest(r)
- val promise = Promise[Future[Response[T]]]()
-
- client
- .newCall(request)
- .enqueue(new Callback {
- override def onFailure(call: Call, e: IOException): Unit =
- promise.failure(e)
- override def onResponse(call: Call, response: OkHttpResponse): Unit =
- try promise.success(readResponse(response, r.responseAs))
- catch { case e: Exception => promise.failure(e) }
- })
-
- responseMonad.flatten(promise.future)
+ rm.flatten(rm.async[R[Response[T]]] { cb =>
+ def success(r: R[Response[T]]) = cb(Right(r))
+ def error(t: Throwable) = cb(Left(t))
+
+ client
+ .newCall(request)
+ .enqueue(new Callback {
+ override def onFailure(call: Call, e: IOException): Unit =
+ error(e)
+
+ override def onResponse(call: Call, response: OkHttpResponse): Unit =
+ try success(readResponse(response, r.responseAs))
+ catch { case e: Exception => error(e) }
+ })
+ })
}
- override def responseMonad: MonadError[Future] = new FutureMonad
+ override def responseMonad: MonadError[R] = rm
}
+class OkHttpFutureClientHandler private (client: OkHttpClient)(
+ implicit ec: ExecutionContext)
+ extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {}
+
object OkHttpFutureClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpHandler[Future, Nothing] =
+ : OkHttpFutureClientHandler =
new OkHttpFutureClientHandler(okhttpClient)
}
diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
index 1c0d3cd..9b64271 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
@@ -21,6 +21,7 @@ import com.softwaremill.sttp.asynchttpclient.cats.CatsAsyncHttpClientHandler
import com.softwaremill.sttp.asynchttpclient.future.FutureAsyncHttpClientHandler
import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler
import com.softwaremill.sttp.asynchttpclient.scalaz.ScalazAsyncHttpClientHandler
+import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler
import com.softwaremill.sttp.okhttp.{
OkHttpFutureClientHandler,
OkHttpSyncClientHandler
@@ -156,8 +157,10 @@ class BasicTests
ForceWrappedValue.catsIo)
runTests("OkHttpSyncClientHandler")(OkHttpSyncClientHandler(),
ForceWrappedValue.id)
- runTests("OkHttpSyncClientHandler - Future")(OkHttpFutureClientHandler(),
- ForceWrappedValue.future)
+ runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureClientHandler(),
+ ForceWrappedValue.future)
+ runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixClientHandler(),
+ ForceWrappedValue.monixTask)
def runTests[R[_]](name: String)(
implicit handler: SttpHandler[R, Nothing],
diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
index 5f04126..5820153 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
@@ -8,6 +8,7 @@ import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler
import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler
+import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler
import com.typesafe.scalalogging.StrictLogging
import monix.reactive.Observable
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
@@ -36,10 +37,12 @@ class StreamingTests
override def port = 51824
val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem)
- val monixHandler = MonixAsyncHttpClientHandler()
+ val monixAsyncHttpClient = MonixAsyncHttpClientHandler()
+ val monixOkHttpClient = OkHttpMonixClientHandler()
akkaStreamingTests()
- monixStreamingTests()
+ monixAsyncHttpClientStreamingTests()
+ monixOkHttpClientStreamingTests()
val body = "streaming test"
@@ -70,8 +73,8 @@ class StreamingTests
}
}
- def monixStreamingTests(): Unit = {
- implicit val handler = monixHandler
+ def monixOkHttpClientStreamingTests(): Unit = {
+ implicit val handler = monixOkHttpClient
import monix.execution.Scheduler.Implicits.global
val body = "streaming test"
@@ -131,9 +134,70 @@ class StreamingTests
}
}
+ def monixAsyncHttpClientStreamingTests(): Unit = {
+ implicit val handler = monixAsyncHttpClient
+ import monix.execution.Scheduler.Implicits.global
+
+ val body = "streaming test"
+
+ "Monix OkHttp Client" should "stream request body" in {
+ val source = Observable.fromIterable(
+ body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
+
+ val response = sttp
+ .post(uri"$endpoint/echo")
+ .streamBody(source)
+ .send()
+ .runAsync
+ .futureValue
+
+ response.body should be(body)
+ }
+
+ it should "receive a stream" in {
+ val response = sttp
+ .post(uri"$endpoint/echo")
+ .body(body)
+ .response(asStream[Observable[ByteBuffer]])
+ .send()
+ .runAsync
+ .futureValue
+
+ val bytes = response.body
+ .flatMap(bb => Observable.fromIterable(bb.array()))
+ .toListL
+ .runAsync
+ .futureValue
+ .toArray
+
+ new String(bytes, "utf-8") should be(body)
+ }
+
+ it should "receive a stream from an https site" in {
+ val response = sttp
+ // of course, you should never rely on the internet being available
+ // in tests, but that's so much easier than setting up an https
+ // testing server
+ .get(uri"https://softwaremill.com")
+ .response(asStream[Observable[ByteBuffer]])
+ .send()
+ .runAsync
+ .futureValue
+
+ val bytes = response.body
+ .flatMap(bb => Observable.fromIterable(bb.array()))
+ .toListL
+ .runAsync
+ .futureValue
+ .toArray
+
+ new String(bytes, "utf-8") should include("</div>")
+ }
+ }
+
override protected def afterAll(): Unit = {
akkaHandler.close()
- monixHandler.close()
+ monixAsyncHttpClient.close()
super.afterAll()
}
}