From c6c1f5a34930946e8ab4e9248b9255ce2e1464fe Mon Sep 17 00:00:00 2001 From: Omar Alejandro Mainegra Sarduy Date: Wed, 9 Aug 2017 16:46:09 -0400 Subject: Wait until the stream is sent to sink --- .../sttp/okhttp/monix/OkHttpMonixClientHandler.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'okhttp-client-handler') diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala index ecfb30e..e792c44 100644 --- a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -10,6 +10,8 @@ import monix.reactive.{Consumer, Observable} import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} import okio.BufferedSink +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.language.higherKinds import scala.util.{Failure, Success, Try} @@ -26,13 +28,18 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)( override def streamToRequestBody( stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = Some(new OkHttpRequestBody() { - override def writeTo(sink: BufferedSink): Unit = - stream + override def writeTo(sink: BufferedSink): Unit = { + val f = stream .consumeWith( Consumer.foreach(chunk => sink.write(chunk.array())) ) .runAsync(io) + // We could safely block until the observable is consumed because OkHttp execute + // this method asynchronous in another ThreadPool. + Await.ready(f, Duration.Inf) + } + override def contentType(): MediaType = null }) -- cgit v1.2.3