aboutsummaryrefslogtreecommitdiff
path: root/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala')
-rw-r--r--okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala43
1 files changed, 16 insertions, 27 deletions
diff --git a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
index f579ff5..bda8959 100644
--- a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
+++ b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
@@ -16,30 +16,24 @@ import okio.BufferedSink
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
-class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(
- implicit s: Scheduler)
- extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client,
- TaskMonad,
- closeClient) {
-
- override def streamToRequestBody(
- stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
+class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(implicit s: Scheduler)
+ extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonad, closeClient) {
+
+ override def streamToRequestBody(stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
Some(new OkHttpRequestBody() {
override def writeTo(sink: BufferedSink): Unit =
toIterable(stream) map (_.array()) foreach sink.write
override def contentType(): MediaType = null
})
- override def responseBodyToStream(
- res: okhttp3.Response): Try[Observable[ByteBuffer]] =
+ override def responseBodyToStream(res: okhttp3.Response): Try[Observable[ByteBuffer]] =
Success(
Observable
.fromInputStream(res.body().byteStream())
.map(ByteBuffer.wrap)
.doAfterTerminate(_ => res.close()))
- private def toIterable[T](observable: Observable[T])(
- implicit s: Scheduler): Iterable[T] =
+ private def toIterable[T](observable: Observable[T])(implicit s: Scheduler): Iterable[T] =
new Iterable[T] {
override def iterator: Iterator[T] = new Iterator[T] {
case object Completed extends Exception
@@ -85,15 +79,11 @@ object OkHttpMonixBackend {
new FollowRedirectsBackend(new OkHttpMonixBackend(client, closeClient)(s))
def apply(options: SttpBackendOptions = SttpBackendOptions.Default)(
- implicit s: Scheduler = Scheduler.Implicits.global)
- : SttpBackend[Task, Observable[ByteBuffer]] =
- OkHttpMonixBackend(
- OkHttpBackend.defaultClient(DefaultReadTimeout.toMillis, options),
- closeClient = true)(s)
-
- def usingClient(client: OkHttpClient)(implicit s: Scheduler =
- Scheduler.Implicits.global)
- : SttpBackend[Task, Observable[ByteBuffer]] =
+ implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] =
+ OkHttpMonixBackend(OkHttpBackend.defaultClient(DefaultReadTimeout.toMillis, options), closeClient = true)(s)
+
+ def usingClient(client: OkHttpClient)(
+ implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] =
OkHttpMonixBackend(client, closeClient = false)(s)
}
@@ -105,8 +95,7 @@ private[monix] object TaskMonad extends MonadAsyncError[Task] {
override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
fa.flatMap(f)
- override def async[T](
- register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
Task.async { (_, cb) =>
register {
case Left(t) => cb(Failure(t))
@@ -118,8 +107,8 @@ private[monix] object TaskMonad extends MonadAsyncError[Task] {
override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
- override protected def handleWrappedError[T](rt: Task[T])(
- h: PartialFunction[Throwable, Task[T]]): Task[T] = rt.onErrorRecoverWith {
- case t => h(t)
- }
+ override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] =
+ rt.onErrorRecoverWith {
+ case t => h(t)
+ }
}