aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmar Alejandro Mainegra Sarduy <omainegra@gmail.com>2017-08-09 16:46:09 -0400
committerOmar Alejandro Mainegra Sarduy <omainegra@gmail.com>2017-08-09 16:46:09 -0400
commitc6c1f5a34930946e8ab4e9248b9255ce2e1464fe (patch)
tree0343a00d2e56ddda1aa0b4c8f97300fdd559229c
parentaf41cd375a1d9d2b2e467a09f31d31f4b8fa44d9 (diff)
downloadsttp-c6c1f5a34930946e8ab4e9248b9255ce2e1464fe.tar.gz
sttp-c6c1f5a34930946e8ab4e9248b9255ce2e1464fe.tar.bz2
sttp-c6c1f5a34930946e8ab4e9248b9255ce2e1464fe.zip
Wait until the stream is sent to sink
-rw-r--r--okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala11
1 files changed, 9 insertions, 2 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
index ecfb30e..e792c44 100644
--- a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
+++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
@@ -10,6 +10,8 @@ import monix.reactive.{Consumer, Observable}
import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
import okio.BufferedSink
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
import scala.language.higherKinds
import scala.util.{Failure, Success, Try}
@@ -26,13 +28,18 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)(
override def streamToRequestBody(
stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
Some(new OkHttpRequestBody() {
- override def writeTo(sink: BufferedSink): Unit =
- stream
+ override def writeTo(sink: BufferedSink): Unit = {
+ val f = stream
.consumeWith(
Consumer.foreach(chunk => sink.write(chunk.array()))
)
.runAsync(io)
+ // We could safely block until the observable is consumed because OkHttp execute
+ // this method asynchronous in another ThreadPool.
+ Await.ready(f, Duration.Inf)
+ }
+
override def contentType(): MediaType = null
})