aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Warski <adam@warski.org>2017-08-28 16:45:33 +0200
committerGitHub <noreply@github.com>2017-08-28 16:45:33 +0200
commitf1b4a997b0ccda703971a3baa0eff007097cb0da (patch)
treea3c8b50bf3c613ae0dade6b9778b3e6b393d7846
parent589820beefd742e8b01f337a49e060ef9b2be3ea (diff)
parent8733320e18faa4d2d2d66a96de503469fc236e46 (diff)
downloadsttp-f1b4a997b0ccda703971a3baa0eff007097cb0da.tar.gz
sttp-f1b4a997b0ccda703971a3baa0eff007097cb0da.tar.bz2
sttp-f1b4a997b0ccda703971a3baa0eff007097cb0da.zip
Merge pull request #24 from omainegra/okhttp3-monix
Okhttp3 monix
-rw-r--r--README.md3
-rw-r--r--build.sbt20
-rw-r--r--okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala108
-rw-r--r--okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala72
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala7
-rw-r--r--tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala128
6 files changed, 231 insertions, 107 deletions
diff --git a/README.md b/README.md
index 18688a6..b5a3179 100644
--- a/README.md
+++ b/README.md
@@ -190,7 +190,8 @@ uri"$scheme://$subdomains.example.com?x=$vx&$params#$jumpTo"
| `MonixAsyncHttpClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` |
| `CatsAsyncHttpClientHandler` | `F[_]: cats.effect.Async` | - |
| `OkHttpSyncClientHandler` | None (`Id`) | - |
-| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - |
+| `OkHttpFutureClientHandler` | `scala.concurrent.Future` | - |
+| `OkHttpMonixClientHandler` | `monix.eval.Task` | `monix.reactive.Observable[ByteBuffer]` |
### `HttpURLConnectionSttpHandler`
diff --git a/build.sbt b/build.sbt
index a68dd25..588df88 100644
--- a/build.sbt
+++ b/build.sbt
@@ -32,7 +32,12 @@ val commonSettings = Seq(
val akkaHttpVersion = "10.0.9"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
+
+val monixVersion = "2.3.0"
+val monix = "io.monix" %% "monix" % monixVersion
+
val circeVersion = "0.8.0"
+
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3"
lazy val rootProject = (project in file("."))
@@ -47,6 +52,7 @@ lazy val rootProject = (project in file("."))
monixAsyncHttpClientHandler,
catsAsyncHttpClientHandler,
okhttpClientHandler,
+ okhttpMonixClientHandler,
circe,
tests
)
@@ -102,9 +108,7 @@ lazy val monixAsyncHttpClientHandler: Project = (project in file(
.settings(commonSettings: _*)
.settings(
name := "async-http-client-handler-monix",
- libraryDependencies ++= Seq(
- "io.monix" %% "monix" % "2.3.0"
- )
+ libraryDependencies ++= Seq(monix)
) dependsOn asyncHttpClientHandler
lazy val catsAsyncHttpClientHandler: Project = (project in file(
@@ -127,6 +131,14 @@ lazy val okhttpClientHandler: Project = (project in file(
)
) dependsOn core
+lazy val okhttpMonixClientHandler: Project = (project in file(
+ "okhttp-client-handler/monix"))
+ .settings(commonSettings: _*)
+ .settings(
+ name := "okhttp-client-handler-monix",
+ libraryDependencies ++= Seq(monix)
+ ) dependsOn okhttpClientHandler
+
lazy val circe: Project = (project in file("circe"))
.settings(commonSettings: _*)
.settings(
@@ -152,4 +164,4 @@ lazy val tests: Project = (project in file("tests"))
).map(_ % "test"),
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test"
) dependsOn (core, akkaHttpHandler, futureAsyncHttpClientHandler, scalazAsyncHttpClientHandler,
-monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler)
+monixAsyncHttpClientHandler, catsAsyncHttpClientHandler, okhttpClientHandler, okhttpMonixClientHandler)
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
new file mode 100644
index 0000000..1401dee
--- /dev/null
+++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
@@ -0,0 +1,108 @@
+package com.softwaremill.sttp.okhttp.monix
+
+import java.nio.ByteBuffer
+import java.util.concurrent.ArrayBlockingQueue
+
+import com.softwaremill.sttp.{SttpHandler, _}
+import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler
+import monix.eval.Task
+import monix.execution.Ack.Continue
+import monix.execution.{Ack, Cancelable, Scheduler}
+import monix.reactive.Observable
+import monix.reactive.observers.Subscriber
+import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
+import okio.BufferedSink
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Created by omainegra on 8/4/17.
+ */
+class OkHttpMonixClientHandler private (client: OkHttpClient)(
+ implicit s: Scheduler)
+ extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client,
+ TaskMonad) {
+
+ override def streamToRequestBody(
+ stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
+ Some(new OkHttpRequestBody() {
+ override def writeTo(sink: BufferedSink): Unit =
+ toIterable(stream) map (_.array()) foreach sink.write
+ override def contentType(): MediaType = null
+ })
+
+ override def responseBodyToStream(
+ res: okhttp3.Response): Try[Observable[ByteBuffer]] =
+ Success(
+ Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap))
+
+ private def toIterable[T](observable: Observable[T])(
+ implicit s: Scheduler): Iterable[T] =
+ new Iterable[T] {
+ override def iterator: Iterator[T] = new Iterator[T] {
+ case object Completed extends Exception
+
+ val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1)
+
+ observable.executeWithFork.subscribe(new Subscriber[T] {
+ override implicit def scheduler: Scheduler = s
+
+ override def onError(ex: Throwable): Unit = {
+ blockingQueue.put(Left(ex))
+ }
+
+ override def onComplete(): Unit = {
+ blockingQueue.put(Left(Completed))
+ }
+
+ override def onNext(elem: T): Future[Ack] = {
+ blockingQueue.put(Right(elem))
+ Continue
+ }
+ })
+
+ var value: T = _
+
+ override def hasNext: Boolean =
+ blockingQueue.take() match {
+ case Left(Completed) => false
+ case Right(elem) =>
+ value = elem
+ true
+ case Left(ex) => throw ex
+ }
+
+ override def next(): T = value
+ }
+ }
+}
+
+object OkHttpMonixClientHandler {
+ def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
+ implicit s: Scheduler = Scheduler.Implicits.global)
+ : SttpHandler[Task, Observable[ByteBuffer]] =
+ new OkHttpMonixClientHandler(okhttpClient)(s)
+}
+
+private[monix] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { (_, cb) =>
+ register {
+ case Left(t) => cb(Failure(t))
+ case Right(t) => cb(Success(t))
+ }
+
+ Cancelable.empty
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
+}
diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
index f7c4466..00ea10c 100644
--- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
+++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
@@ -19,7 +19,7 @@ import okhttp3.{
import okio.{BufferedSink, Okio}
import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
import scala.util.{Failure, Try}
@@ -29,7 +29,7 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
val builder = new OkHttpRequest.Builder()
.url(request.uri.toString)
- val body = setBody(request.body)
+ val body = setBody(request)
builder.method(request.method.m, body.getOrElse {
if (HttpMethod.requiresRequestBody(request.method.m))
OkHttpRequestBody.create(null, "")
@@ -46,12 +46,13 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
builder.build()
}
- private def setBody(requestBody: RequestBody[S]): Option[OkHttpRequestBody] = {
- requestBody match {
+ private def setBody[T](request: Request[T, S]): Option[OkHttpRequestBody] = {
+ request.body match {
case NoBody => None
- case StringBody(b, encoding, _) =>
- Some(OkHttpRequestBody.create(MediaType.parse(encoding), b))
- case ByteArrayBody(b, _) => Some(OkHttpRequestBody.create(null, b))
+ case StringBody(b, _, _) =>
+ Some(OkHttpRequestBody.create(null, b))
+ case ByteArrayBody(b, _) =>
+ Some(OkHttpRequestBody.create(null, b))
case ByteBufferBody(b, _) =>
Some(OkHttpRequestBody.create(null, b.array()))
case InputStreamBody(b, _) =>
@@ -60,14 +61,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
sink.writeAll(Okio.source(b))
override def contentType(): MediaType = null
})
- case PathBody(b, _) => Some(OkHttpRequestBody.create(null, b.toFile))
- case StreamBody(s) => None
+ case PathBody(b, _) =>
+ Some(OkHttpRequestBody.create(null, b.toFile))
+ case StreamBody(s) =>
+ streamToRequestBody(s)
}
}
private[okhttp] def readResponse[T](
res: OkHttpResponse,
responseAs: ResponseAs[T, S]): R[Response[T]] = {
+
val body = responseHandler(res).handle(responseAs, responseMonad)
val headers = res
@@ -87,12 +91,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
case ResponseAsString(encoding) =>
Try(res.body().source().readString(Charset.forName(encoding)))
case ResponseAsByteArray => Try(res.body().bytes())
- case ResponseAsStream() =>
- Failure(new IllegalStateException("Streaming isn't supported"))
+ case ras @ ResponseAsStream() =>
+ responseBodyToStream(res).map(ras.responseIsStream)
case ResponseAsFile(file, overwrite) =>
Try(ResponseAs.saveFile(file, res.body().byteStream(), overwrite))
}
}
+
+ def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None
+
+ def responseBodyToStream(res: OkHttpResponse): Try[S] =
+ Failure(new IllegalStateException("Streaming isn't supported"))
}
class OkHttpSyncClientHandler private (client: OkHttpClient)
@@ -108,35 +117,40 @@ class OkHttpSyncClientHandler private (client: OkHttpClient)
object OkHttpSyncClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())
- : SttpHandler[Id, Nothing] =
+ : OkHttpSyncClientHandler =
new OkHttpSyncClientHandler(okhttpClient)
}
-class OkHttpFutureClientHandler private (client: OkHttpClient)(
- implicit ec: ExecutionContext)
- extends OkHttpClientHandler[Future, Nothing](client) {
-
- override def send[T](r: Request[T, Nothing]): Future[Response[T]] = {
+abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient,
+ rm: MonadAsyncError[R])
+ extends OkHttpClientHandler[R, S](client) {
+ override def send[T](r: Request[T, S]): R[Response[T]] = {
val request = convertRequest(r)
- val promise = Promise[Future[Response[T]]]()
- client
- .newCall(request)
- .enqueue(new Callback {
- override def onFailure(call: Call, e: IOException): Unit =
- promise.failure(e)
+ rm.flatten(rm.async[R[Response[T]]] { cb =>
+ def success(r: R[Response[T]]) = cb(Right(r))
+ def error(t: Throwable) = cb(Left(t))
- override def onResponse(call: Call, response: OkHttpResponse): Unit =
- try promise.success(readResponse(response, r.responseAs))
- catch { case e: Exception => promise.failure(e) }
- })
+ client
+ .newCall(request)
+ .enqueue(new Callback {
+ override def onFailure(call: Call, e: IOException): Unit =
+ error(e)
- responseMonad.flatten(promise.future)
+ override def onResponse(call: Call, response: OkHttpResponse): Unit =
+ try success(readResponse(response, r.responseAs))
+ catch { case e: Exception => error(e) }
+ })
+ })
}
- override def responseMonad: MonadError[Future] = new FutureMonad
+ override def responseMonad: MonadError[R] = rm
}
+class OkHttpFutureClientHandler private (client: OkHttpClient)(
+ implicit ec: ExecutionContext)
+ extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {}
+
object OkHttpFutureClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
diff --git a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
index 1c0d3cd..9b64271 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/BasicTests.scala
@@ -21,6 +21,7 @@ import com.softwaremill.sttp.asynchttpclient.cats.CatsAsyncHttpClientHandler
import com.softwaremill.sttp.asynchttpclient.future.FutureAsyncHttpClientHandler
import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler
import com.softwaremill.sttp.asynchttpclient.scalaz.ScalazAsyncHttpClientHandler
+import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler
import com.softwaremill.sttp.okhttp.{
OkHttpFutureClientHandler,
OkHttpSyncClientHandler
@@ -156,8 +157,10 @@ class BasicTests
ForceWrappedValue.catsIo)
runTests("OkHttpSyncClientHandler")(OkHttpSyncClientHandler(),
ForceWrappedValue.id)
- runTests("OkHttpSyncClientHandler - Future")(OkHttpFutureClientHandler(),
- ForceWrappedValue.future)
+ runTests("OkHttpAsyncClientHandler - Future")(OkHttpFutureClientHandler(),
+ ForceWrappedValue.future)
+ runTests("OkHttpAsyncClientHandler - Monix")(OkHttpMonixClientHandler(),
+ ForceWrappedValue.monixTask)
def runTests[R[_]](name: String)(
implicit handler: SttpHandler[R, Nothing],
diff --git a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
index 5f04126..3238c3c 100644
--- a/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
+++ b/tests/src/test/scala/com/softwaremill/sttp/StreamingTests.scala
@@ -8,10 +8,14 @@ import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.softwaremill.sttp.akkahttp.AkkaHttpSttpHandler
import com.softwaremill.sttp.asynchttpclient.monix.MonixAsyncHttpClientHandler
+import com.softwaremill.sttp.okhttp.monix.OkHttpMonixClientHandler
import com.typesafe.scalalogging.StrictLogging
+import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.language.higherKinds
class StreamingTests
extends FlatSpec
@@ -20,6 +24,7 @@ class StreamingTests
with ScalaFutures
with StrictLogging
with IntegrationPatience
+ with ForceWrapped
with TestHttpServer {
override val serverRoutes: Route =
@@ -33,59 +38,56 @@ class StreamingTests
}
}
- override def port = 51824
-
- val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem)
- val monixHandler = MonixAsyncHttpClientHandler()
-
- akkaStreamingTests()
- monixStreamingTests()
+ type BodyProducer[S] = String => S
+ type BodyConsumer[S] = S => String
+ override def port = 51824
val body = "streaming test"
- def akkaStreamingTests(): Unit = {
- implicit val handler = akkaHandler
-
- "Akka HTTP" should "stream request body" in {
- val response = sttp
- .post(uri"$endpoint/echo")
- .streamBody(Source.single(ByteString(body)))
- .send()
- .futureValue
-
- response.body should be(body)
- }
-
- it should "receive a stream" in {
- val response = sttp
- .post(uri"$endpoint/echo")
- .body(body)
- .response(asStream[Source[ByteString, Any]])
- .send()
- .futureValue
-
- val responseBody = response.body.runReduce(_ ++ _).futureValue.utf8String
-
- responseBody should be(body)
- }
- }
-
- def monixStreamingTests(): Unit = {
- implicit val handler = monixHandler
- import monix.execution.Scheduler.Implicits.global
-
- val body = "streaming test"
-
- "Monix Async Http Client" should "stream request body" in {
- val source = Observable.fromIterable(
- body.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
-
+ val akkaHandler = AkkaHttpSttpHandler.usingActorSystem(actorSystem)
+ val monixAsyncHttpClient = MonixAsyncHttpClientHandler()
+ val monixOkHttpClient = OkHttpMonixClientHandler()
+
+ val akkaHttpBodyProducer: BodyProducer[Source[ByteString, Any]] = s =>
+ Source.single(ByteString(s))
+ val akkaHttpBodyConsumer: BodyConsumer[Source[ByteString, Any]] =
+ _.runReduce(_ ++ _).futureValue.utf8String
+
+ val monixBodyProducer: BodyProducer[Observable[ByteBuffer]] =
+ s =>
+ Observable.fromIterable(
+ s.getBytes("utf-8").map(b => ByteBuffer.wrap(Array(b))))
+
+ val monixBodyConsumer: BodyConsumer[Observable[ByteBuffer]] = stream =>
+ new String(stream
+ .flatMap(bb => Observable.fromIterable(bb.array()))
+ .toListL
+ .runAsync
+ .futureValue
+ .toArray,
+ "utf-8")
+
+ runTests("Akka HTTP", akkaHttpBodyProducer, akkaHttpBodyConsumer)(
+ akkaHandler,
+ ForceWrappedValue.future)
+ runTests("Monix Async Http Client", monixBodyProducer, monixBodyConsumer)(
+ monixAsyncHttpClient,
+ ForceWrappedValue.monixTask)
+ runTests("Monix OkHttp Client", monixBodyProducer, monixBodyConsumer)(
+ monixOkHttpClient,
+ ForceWrappedValue.monixTask)
+
+ def runTests[R[_], S](name: String,
+ bodyProducer: BodyProducer[S],
+ bodyConsumer: BodyConsumer[S])(
+ implicit handler: SttpHandler[R, S],
+ forceResponse: ForceWrappedValue[R]): Unit = {
+ name should "stream request body" in {
val response = sttp
.post(uri"$endpoint/echo")
- .streamBody(source)
+ .streamBody(bodyProducer(body))
.send()
- .runAsync
- .futureValue
+ .force()
response.body should be(body)
}
@@ -94,19 +96,11 @@ class StreamingTests
val response = sttp
.post(uri"$endpoint/echo")
.body(body)
- .response(asStream[Observable[ByteBuffer]])
+ .response(asStream[S])
.send()
- .runAsync
- .futureValue
+ .force()
- val bytes = response.body
- .flatMap(bb => Observable.fromIterable(bb.array()))
- .toListL
- .runAsync
- .futureValue
- .toArray
-
- new String(bytes, "utf-8") should be(body)
+ bodyConsumer(response.body) should be(body)
}
it should "receive a stream from an https site" in {
@@ -115,25 +109,17 @@ class StreamingTests
// in tests, but that's so much easier than setting up an https
// testing server
.get(uri"https://softwaremill.com")
- .response(asStream[Observable[ByteBuffer]])
+ .response(asStream[S])
.send()
- .runAsync
- .futureValue
-
- val bytes = response.body
- .flatMap(bb => Observable.fromIterable(bb.array()))
- .toListL
- .runAsync
- .futureValue
- .toArray
+ .force()
- new String(bytes, "utf-8") should include("</div>")
+ bodyConsumer(response.body) should include("</div>")
}
}
override protected def afterAll(): Unit = {
akkaHandler.close()
- monixHandler.close()
+ monixAsyncHttpClient.close()
super.afterAll()
}
}