aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala11
1 files changed, 9 insertions, 2 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
index ecfb30e..e792c44 100644
--- a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
+++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
@@ -10,6 +10,8 @@ import monix.reactive.{Consumer, Observable}
import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
import okio.BufferedSink
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
import scala.language.higherKinds
import scala.util.{Failure, Success, Try}
@@ -26,13 +28,18 @@ class OkHttpMonixClientHandler private (client: OkHttpClient)(
override def streamToRequestBody(
stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
Some(new OkHttpRequestBody() {
- override def writeTo(sink: BufferedSink): Unit =
- stream
+ override def writeTo(sink: BufferedSink): Unit = {
+ val f = stream
.consumeWith(
Consumer.foreach(chunk => sink.write(chunk.array()))
)
.runAsync(io)
+ // We could safely block until the observable is consumed because OkHttp execute
+ // this method asynchronous in another ThreadPool.
+ Await.ready(f, Duration.Inf)
+ }
+
override def contentType(): MediaType = null
})