diff options
author | Adam Warski <adam@warski.org> | 2018-05-18 15:52:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-05-18 15:52:12 +0200 |
commit | 588395d018c258eb74f60ad95bad706698bdf915 (patch) | |
tree | 2c4ee642d8efb9e6785da3fe8a7decba507329ab /okhttp-backend | |
parent | 96ff655f906f2e3f4e9ba906c42e96506f4668b9 (diff) | |
parent | 5980017ece9be1ebf30775e5babf81e0e2f1fcd9 (diff) | |
download | sttp-588395d018c258eb74f60ad95bad706698bdf915.tar.gz sttp-588395d018c258eb74f60ad95bad706698bdf915.tar.bz2 sttp-588395d018c258eb74f60ad95bad706698bdf915.zip |
Merge pull request #93 from guymers/scalajs-1
Extract MonadAsyncError implementations
Diffstat (limited to 'okhttp-backend')
-rw-r--r-- | okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala | 35 |
1 files changed, 5 insertions, 30 deletions
diff --git a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala index bda8959..332d8d0 100644 --- a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala +++ b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala @@ -3,21 +3,22 @@ package com.softwaremill.sttp.okhttp.monix import java.nio.ByteBuffer import java.util.concurrent.ArrayBlockingQueue +import com.softwaremill.sttp.impl.monix.TaskMonadAsyncError import com.softwaremill.sttp.{SttpBackend, _} import com.softwaremill.sttp.okhttp.{OkHttpAsyncBackend, OkHttpBackend} import monix.eval.Task import monix.execution.Ack.Continue -import monix.execution.{Ack, Cancelable, Scheduler} +import monix.execution.{Ack, Scheduler} import monix.reactive.Observable import monix.reactive.observers.Subscriber import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} import okio.BufferedSink import scala.concurrent.Future -import scala.util.{Failure, Success, Try} +import scala.util.{Success, Try} class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(implicit s: Scheduler) - extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonad, closeClient) { + extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonadAsyncError, closeClient) { override def streamToRequestBody(stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = Some(new OkHttpRequestBody() { @@ -40,7 +41,7 @@ class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(im val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1) - observable.executeWithFork.subscribe(new Subscriber[T] { + observable.executeAsync.subscribe(new Subscriber[T] { override implicit def scheduler: Scheduler = s override def onError(ex: Throwable): Unit = { @@ -86,29 +87,3 @@ object OkHttpMonixBackend { implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] = OkHttpMonixBackend(client, closeClient = false)(s) } - -private[monix] object TaskMonad extends MonadAsyncError[Task] { - override def unit[T](t: T): Task[T] = Task.now(t) - - override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = - fa.flatMap(f) - - override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = - Task.async { (_, cb) => - register { - case Left(t) => cb(Failure(t)) - case Right(t) => cb(Success(t)) - } - - Cancelable.empty - } - - override def error[T](t: Throwable): Task[T] = Task.raiseError(t) - - override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = - rt.onErrorRecoverWith { - case t => h(t) - } -} |