diff options
author | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-04 18:14:28 -0400 |
---|---|---|
committer | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-04 18:14:28 -0400 |
commit | b217185d8bd4297aef1e91b34f295620a1832e0b (patch) | |
tree | 571c08d3fb7e3cd328b0b91f60a981eea9ab203c /okhttp-client-handler | |
parent | 1a9b0b61e18c9b57fa52cebeb80fb1caddf58186 (diff) | |
download | sttp-b217185d8bd4297aef1e91b34f295620a1832e0b.tar.gz sttp-b217185d8bd4297aef1e91b34f295620a1832e0b.tar.bz2 sttp-b217185d8bd4297aef1e91b34f295620a1832e0b.zip |
Add new Monix handler
Diffstat (limited to 'okhttp-client-handler')
-rw-r--r-- | okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala new file mode 100644 index 0000000..ea4af14 --- /dev/null +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -0,0 +1,72 @@ +package com.softwaremill.sttp.okhttp.monix + +import java.nio.ByteBuffer + +import com.softwaremill.sttp._ +import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler +import monix.eval.{Callback, Task} +import monix.execution.cancelables.AssignableCancelable +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.observers.Subscriber +import monix.reactive.{Consumer, Observable} +import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} +import okio.{BufferedSink, Okio} + +import scala.language.higherKinds +import scala.util.{Failure, Success, Try} + +/** + * Created by omainegra on 8/4/17. + */ +class OkHttpMonixClientHandler private (client: OkHttpClient)( + implicit s: Scheduler) + extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client, + TaskMonad) { + + override def streamToRequestBody( + stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = + Some(new OkHttpRequestBody() { + override def writeTo(sink: BufferedSink): Unit = + stream + .consumeWith( + Consumer.foreach(chunck => sink.write(chunck.array())) + ) + .runAsync + + override def contentType(): MediaType = null + }) + + override def responseBodyToStream( + res: okhttp3.Response): Try[Observable[ByteBuffer]] = + Success( + Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap)) +} + +object OkHttpMonixClientHandler { + def apply(okhttpClient: OkHttpClient = new OkHttpClient())( + implicit s: Scheduler = Scheduler.Implicits.global) + : OkHttpMonixClientHandler = + new OkHttpMonixClientHandler(okhttpClient)(s) +} + +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) +} |