aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmar Alejandro Mainegra Sarduy <omainegra@gmail.com>2017-08-14 11:11:08 -0400
committerOmar Alejandro Mainegra Sarduy <omainegra@gmail.com>2017-08-14 11:26:48 -0400
commita21bbe418697195128895914184242368ff036b8 (patch)
treeb508b7d006e78a1bdeaab482f8bf1ce57b55491d
parent065d207a9b9b17e370b93bf17c2a8edf5ef472fe (diff)
downloadsttp-a21bbe418697195128895914184242368ff036b8.tar.gz
sttp-a21bbe418697195128895914184242368ff036b8.tar.bz2
sttp-a21bbe418697195128895914184242368ff036b8.zip
Improve streaming part
-rw-r--r--okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala71
1 files changed, 50 insertions, 21 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
index e792c44..1401dee 100644
--- a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
+++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
@@ -1,18 +1,19 @@
package com.softwaremill.sttp.okhttp.monix
import java.nio.ByteBuffer
+import java.util.concurrent.ArrayBlockingQueue
-import com.softwaremill.sttp._
+import com.softwaremill.sttp.{SttpHandler, _}
import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler
import monix.eval.Task
-import monix.execution.{Cancelable, Scheduler}
-import monix.reactive.{Consumer, Observable}
+import monix.execution.Ack.Continue
+import monix.execution.{Ack, Cancelable, Scheduler}
+import monix.reactive.Observable
+import monix.reactive.observers.Subscriber
import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
import okio.BufferedSink
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.language.higherKinds
+import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
/**
@@ -23,23 +24,11 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)(
extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client,
TaskMonad) {
- private lazy val io = Scheduler.io("sttp-monix-io")
-
override def streamToRequestBody(
stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
Some(new OkHttpRequestBody() {
- override def writeTo(sink: BufferedSink): Unit = {
- val f = stream
- .consumeWith(
- Consumer.foreach(chunk => sink.write(chunk.array()))
- )
- .runAsync(io)
-
- // We could safely block until the observable is consumed because OkHttp execute
- // this method asynchronous in another ThreadPool.
- Await.ready(f, Duration.Inf)
- }
-
+ override def writeTo(sink: BufferedSink): Unit =
+ toIterable(stream) map (_.array()) foreach sink.write
override def contentType(): MediaType = null
})
@@ -47,12 +36,52 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)(
res: okhttp3.Response): Try[Observable[ByteBuffer]] =
Success(
Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap))
+
+ private def toIterable[T](observable: Observable[T])(
+ implicit s: Scheduler): Iterable[T] =
+ new Iterable[T] {
+ override def iterator: Iterator[T] = new Iterator[T] {
+ case object Completed extends Exception
+
+ val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1)
+
+ observable.executeWithFork.subscribe(new Subscriber[T] {
+ override implicit def scheduler: Scheduler = s
+
+ override def onError(ex: Throwable): Unit = {
+ blockingQueue.put(Left(ex))
+ }
+
+ override def onComplete(): Unit = {
+ blockingQueue.put(Left(Completed))
+ }
+
+ override def onNext(elem: T): Future[Ack] = {
+ blockingQueue.put(Right(elem))
+ Continue
+ }
+ })
+
+ var value: T = _
+
+ override def hasNext: Boolean =
+ blockingQueue.take() match {
+ case Left(Completed) => false
+ case Right(elem) =>
+ value = elem
+ true
+ case Left(ex) => throw ex
+ }
+
+ override def next(): T = value
+ }
+ }
}
object OkHttpMonixClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
implicit s: Scheduler = Scheduler.Implicits.global)
- : OkHttpMonixClientHandler =
+ : SttpHandler[Task, Observable[ByteBuffer]] =
new OkHttpMonixClientHandler(okhttpClient)(s)
}