From 40288a1aaddfc27e141771371d69122ce222a8d0 Mon Sep 17 00:00:00 2001 From: Sam Guymer Date: Thu, 17 May 2018 20:07:04 +1000 Subject: Extract MonadAsyncError implementations Extract MonadAsyncError implementations into their own projects to allow reuse by multiple backends. --- .../sttp/okhttp/monix/OkHttpMonixBackend.scala | 35 ++++------------------ 1 file changed, 5 insertions(+), 30 deletions(-) (limited to 'okhttp-backend/monix/src/main') diff --git a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala index bda8959..332d8d0 100644 --- a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala +++ b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala @@ -3,21 +3,22 @@ package com.softwaremill.sttp.okhttp.monix import java.nio.ByteBuffer import java.util.concurrent.ArrayBlockingQueue +import com.softwaremill.sttp.impl.monix.TaskMonadAsyncError import com.softwaremill.sttp.{SttpBackend, _} import com.softwaremill.sttp.okhttp.{OkHttpAsyncBackend, OkHttpBackend} import monix.eval.Task import monix.execution.Ack.Continue -import monix.execution.{Ack, Cancelable, Scheduler} +import monix.execution.{Ack, Scheduler} import monix.reactive.Observable import monix.reactive.observers.Subscriber import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} import okio.BufferedSink import scala.concurrent.Future -import scala.util.{Failure, Success, Try} +import scala.util.{Success, Try} class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(implicit s: Scheduler) - extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonad, closeClient) { + extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonadAsyncError, closeClient) { override def streamToRequestBody(stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = Some(new OkHttpRequestBody() { @@ -40,7 +41,7 @@ class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(im val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1) - observable.executeWithFork.subscribe(new Subscriber[T] { + observable.executeAsync.subscribe(new Subscriber[T] { override implicit def scheduler: Scheduler = s override def onError(ex: Throwable): Unit = { @@ -86,29 +87,3 @@ object OkHttpMonixBackend { implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] = OkHttpMonixBackend(client, closeClient = false)(s) } - -private[monix] object TaskMonad extends MonadAsyncError[Task] { - override def unit[T](t: T): Task[T] = Task.now(t) - - override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f) - - override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] = - fa.flatMap(f) - - override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = - Task.async { (_, cb) => - register { - case Left(t) => cb(Failure(t)) - case Right(t) => cb(Success(t)) - } - - Cancelable.empty - } - - override def error[T](t: Throwable): Task[T] = Task.raiseError(t) - - override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] = - rt.onErrorRecoverWith { - case t => h(t) - } -} -- cgit v1.2.3