diff options
Diffstat (limited to 'okhttp-backend/monix/src/main/scala/com/softwaremill')
-rw-r--r-- | okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala | 43 |
1 files changed, 16 insertions, 27 deletions
diff --git a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala index f579ff5..bda8959 100644 --- a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala +++ b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala @@ -16,30 +16,24 @@ import okio.BufferedSink import scala.concurrent.Future import scala.util.{Failure, Success, Try} -class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)( - implicit s: Scheduler) - extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, - TaskMonad, - closeClient) { - - override def streamToRequestBody( - stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = +class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(implicit s: Scheduler) + extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonad, closeClient) { + + override def streamToRequestBody(stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = Some(new OkHttpRequestBody() { override def writeTo(sink: BufferedSink): Unit = toIterable(stream) map (_.array()) foreach sink.write override def contentType(): MediaType = null }) - override def responseBodyToStream( - res: okhttp3.Response): Try[Observable[ByteBuffer]] = + override def responseBodyToStream(res: okhttp3.Response): Try[Observable[ByteBuffer]] = Success( Observable .fromInputStream(res.body().byteStream()) .map(ByteBuffer.wrap) .doAfterTerminate(_ => res.close())) - private def toIterable[T](observable: Observable[T])( - implicit s: Scheduler): Iterable[T] = + private def toIterable[T](observable: Observable[T])(implicit s: Scheduler): Iterable[T] = new Iterable[T] { override def iterator: Iterator[T] = new Iterator[T] { case object Completed extends Exception @@ -85,15 +79,11 @@ object OkHttpMonixBackend { new FollowRedirectsBackend(new OkHttpMonixBackend(client, closeClient)(s)) def apply(options: SttpBackendOptions = SttpBackendOptions.Default)( - implicit s: Scheduler = Scheduler.Implicits.global) - : SttpBackend[Task, Observable[ByteBuffer]] = - OkHttpMonixBackend( - OkHttpBackend.defaultClient(DefaultReadTimeout.toMillis, options), - closeClient = true)(s) - - def usingClient(client: OkHttpClient)(implicit s: Scheduler = - Scheduler.Implicits.global) - : SttpBackend[Task, Observable[ByteBuffer]] = + implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] = + OkHttpMonixBackend(OkHttpBackend.defaultClient(DefaultReadTimeout.toMillis, options), closeClient = true)(s) + + def usingClient(client: OkHttpClient)( + implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] = OkHttpMonixBackend(client, closeClient = false)(s) } @@ -105,8 +95,7 @@ private[monix] object TaskMonad extends MonadAsyncError[Task] { override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = fa.flatMap(f) - override def async[T]( - register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = Task.async { (_, cb) => register { case Left(t) => cb(Failure(t)) @@ -118,8 +107,8 @@ private[monix] object TaskMonad extends MonadAsyncError[Task] { override def error[T](t: Throwable): Task[T] = Task.raiseError(t) - override protected def handleWrappedError[T](rt: Task[T])( - h: PartialFunction[Throwable, Task[T]]): Task[T] = rt.onErrorRecoverWith { - case t => h(t) - } + override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = + rt.onErrorRecoverWith { + case t => h(t) + } } |