diff options
author | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-14 11:11:08 -0400 |
---|---|---|
committer | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-14 11:26:48 -0400 |
commit | a21bbe418697195128895914184242368ff036b8 (patch) | |
tree | b508b7d006e78a1bdeaab482f8bf1ce57b55491d /okhttp-client-handler | |
parent | 065d207a9b9b17e370b93bf17c2a8edf5ef472fe (diff) | |
download | sttp-a21bbe418697195128895914184242368ff036b8.tar.gz sttp-a21bbe418697195128895914184242368ff036b8.tar.bz2 sttp-a21bbe418697195128895914184242368ff036b8.zip |
Improve streaming part
Diffstat (limited to 'okhttp-client-handler')
-rw-r--r-- | okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala | 71 |
1 files changed, 50 insertions, 21 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala index e792c44..1401dee 100644 --- a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -1,18 +1,19 @@ package com.softwaremill.sttp.okhttp.monix import java.nio.ByteBuffer +import java.util.concurrent.ArrayBlockingQueue -import com.softwaremill.sttp._ +import com.softwaremill.sttp.{SttpHandler, _} import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler import monix.eval.Task -import monix.execution.{Cancelable, Scheduler} -import monix.reactive.{Consumer, Observable} +import monix.execution.Ack.Continue +import monix.execution.{Ack, Cancelable, Scheduler} +import monix.reactive.Observable +import monix.reactive.observers.Subscriber import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} import okio.BufferedSink -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.language.higherKinds +import scala.concurrent.Future import scala.util.{Failure, Success, Try} /** @@ -23,23 +24,11 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)( extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client, TaskMonad) { - private lazy val io = Scheduler.io("sttp-monix-io") - override def streamToRequestBody( stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = Some(new OkHttpRequestBody() { - override def writeTo(sink: BufferedSink): Unit = { - val f = stream - .consumeWith( - Consumer.foreach(chunk => sink.write(chunk.array())) - ) - .runAsync(io) - - // We could safely block until the observable is consumed because OkHttp execute - // this method asynchronous in another ThreadPool. - Await.ready(f, Duration.Inf) - } - + override def writeTo(sink: BufferedSink): Unit = + toIterable(stream) map (_.array()) foreach sink.write override def contentType(): MediaType = null }) @@ -47,12 +36,52 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)( res: okhttp3.Response): Try[Observable[ByteBuffer]] = Success( Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap)) + + private def toIterable[T](observable: Observable[T])( + implicit s: Scheduler): Iterable[T] = + new Iterable[T] { + override def iterator: Iterator[T] = new Iterator[T] { + case object Completed extends Exception + + val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1) + + observable.executeWithFork.subscribe(new Subscriber[T] { + override implicit def scheduler: Scheduler = s + + override def onError(ex: Throwable): Unit = { + blockingQueue.put(Left(ex)) + } + + override def onComplete(): Unit = { + blockingQueue.put(Left(Completed)) + } + + override def onNext(elem: T): Future[Ack] = { + blockingQueue.put(Right(elem)) + Continue + } + }) + + var value: T = _ + + override def hasNext: Boolean = + blockingQueue.take() match { + case Left(Completed) => false + case Right(elem) => + value = elem + true + case Left(ex) => throw ex + } + + override def next(): T = value + } + } } object OkHttpMonixClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient())( implicit s: Scheduler = Scheduler.Implicits.global) - : OkHttpMonixClientHandler = + : SttpHandler[Task, Observable[ByteBuffer]] = new OkHttpMonixClientHandler(okhttpClient)(s) } |