diff options
author | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-09 16:46:09 -0400 |
---|---|---|
committer | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-09 16:46:09 -0400 |
commit | c6c1f5a34930946e8ab4e9248b9255ce2e1464fe (patch) | |
tree | 0343a00d2e56ddda1aa0b4c8f97300fdd559229c /okhttp-client-handler | |
parent | af41cd375a1d9d2b2e467a09f31d31f4b8fa44d9 (diff) | |
download | sttp-c6c1f5a34930946e8ab4e9248b9255ce2e1464fe.tar.gz sttp-c6c1f5a34930946e8ab4e9248b9255ce2e1464fe.tar.bz2 sttp-c6c1f5a34930946e8ab4e9248b9255ce2e1464fe.zip |
Wait until the stream is sent to sink
Diffstat (limited to 'okhttp-client-handler')
-rw-r--r-- | okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala index ecfb30e..e792c44 100644 --- a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -10,6 +10,8 @@ import monix.reactive.{Consumer, Observable} import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} import okio.BufferedSink +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.language.higherKinds import scala.util.{Failure, Success, Try} @@ -26,13 +28,18 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)( override def streamToRequestBody( stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = Some(new OkHttpRequestBody() { - override def writeTo(sink: BufferedSink): Unit = - stream + override def writeTo(sink: BufferedSink): Unit = { + val f = stream .consumeWith( Consumer.foreach(chunk => sink.write(chunk.array())) ) .runAsync(io) + // We could safely block until the observable is consumed because OkHttp execute + // this method asynchronous in another ThreadPool. + Await.ready(f, Duration.Inf) + } + override def contentType(): MediaType = null }) |