aboutsummaryrefslogtreecommitdiff
path: root/okhttp-backend
diff options
context:
space:
mode:
authorAdam Warski <adam@warski.org>2018-05-18 15:52:12 +0200
committerGitHub <noreply@github.com>2018-05-18 15:52:12 +0200
commit588395d018c258eb74f60ad95bad706698bdf915 (patch)
tree2c4ee642d8efb9e6785da3fe8a7decba507329ab /okhttp-backend
parent96ff655f906f2e3f4e9ba906c42e96506f4668b9 (diff)
parent5980017ece9be1ebf30775e5babf81e0e2f1fcd9 (diff)
downloadsttp-588395d018c258eb74f60ad95bad706698bdf915.tar.gz
sttp-588395d018c258eb74f60ad95bad706698bdf915.tar.bz2
sttp-588395d018c258eb74f60ad95bad706698bdf915.zip
Merge pull request #93 from guymers/scalajs-1
Extract MonadAsyncError implementations
Diffstat (limited to 'okhttp-backend')
-rw-r--r--okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala35
1 files changed, 5 insertions, 30 deletions
diff --git a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
index bda8959..332d8d0 100644
--- a/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
+++ b/okhttp-backend/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixBackend.scala
@@ -3,21 +3,22 @@ package com.softwaremill.sttp.okhttp.monix
import java.nio.ByteBuffer
import java.util.concurrent.ArrayBlockingQueue
+import com.softwaremill.sttp.impl.monix.TaskMonadAsyncError
import com.softwaremill.sttp.{SttpBackend, _}
import com.softwaremill.sttp.okhttp.{OkHttpAsyncBackend, OkHttpBackend}
import monix.eval.Task
import monix.execution.Ack.Continue
-import monix.execution.{Ack, Cancelable, Scheduler}
+import monix.execution.{Ack, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
import okio.BufferedSink
import scala.concurrent.Future
-import scala.util.{Failure, Success, Try}
+import scala.util.{Success, Try}
class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(implicit s: Scheduler)
- extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonad, closeClient) {
+ extends OkHttpAsyncBackend[Task, Observable[ByteBuffer]](client, TaskMonadAsyncError, closeClient) {
override def streamToRequestBody(stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
Some(new OkHttpRequestBody() {
@@ -40,7 +41,7 @@ class OkHttpMonixBackend private (client: OkHttpClient, closeClient: Boolean)(im
val blockingQueue = new ArrayBlockingQueue[Either[Throwable, T]](1)
- observable.executeWithFork.subscribe(new Subscriber[T] {
+ observable.executeAsync.subscribe(new Subscriber[T] {
override implicit def scheduler: Scheduler = s
override def onError(ex: Throwable): Unit = {
@@ -86,29 +87,3 @@ object OkHttpMonixBackend {
implicit s: Scheduler = Scheduler.Implicits.global): SttpBackend[Task, Observable[ByteBuffer]] =
OkHttpMonixBackend(client, closeClient = false)(s)
}
-
-private[monix] object TaskMonad extends MonadAsyncError[Task] {
- override def unit[T](t: T): Task[T] = Task.now(t)
-
- override def map[T, T2](fa: Task[T])(f: (T) => T2): Task[T2] = fa.map(f)
-
- override def flatMap[T, T2](fa: Task[T])(f: (T) => Task[T2]): Task[T2] =
- fa.flatMap(f)
-
- override def async[T](register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
- Task.async { (_, cb) =>
- register {
- case Left(t) => cb(Failure(t))
- case Right(t) => cb(Success(t))
- }
-
- Cancelable.empty
- }
-
- override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
-
- override protected def handleWrappedError[T](rt: Task[T])(h: PartialFunction[Throwable, Task[T]]): Task[T] =
- rt.onErrorRecoverWith {
- case t => h(t)
- }
-}