diff options
Diffstat (limited to 'okhttp-client-handler')
2 files changed, 115 insertions, 25 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala new file mode 100644 index 0000000..e792c44 --- /dev/null +++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala @@ -0,0 +1,79 @@ +package com.softwaremill.sttp.okhttp.monix + +import java.nio.ByteBuffer + +import com.softwaremill.sttp._ +import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler +import monix.eval.Task +import monix.execution.{Cancelable, Scheduler} +import monix.reactive.{Consumer, Observable} +import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody} +import okio.BufferedSink + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.language.higherKinds +import scala.util.{Failure, Success, Try} + +/** + * Created by omainegra on 8/4/17. + */ +class OkHttpMonixClientHandler private (client: OkHttpClient)( + implicit s: Scheduler) + extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client, + TaskMonad) { + + private lazy val io = Scheduler.io("sttp-monix-io") + + override def streamToRequestBody( + stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] = + Some(new OkHttpRequestBody() { + override def writeTo(sink: BufferedSink): Unit = { + val f = stream + .consumeWith( + Consumer.foreach(chunk => sink.write(chunk.array())) + ) + .runAsync(io) + + // We could safely block until the observable is consumed because OkHttp execute + // this method asynchronous in another ThreadPool. + Await.ready(f, Duration.Inf) + } + + override def contentType(): MediaType = null + }) + + override def responseBodyToStream( + res: okhttp3.Response): Try[Observable[ByteBuffer]] = + Success( + Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap)) +} + +object OkHttpMonixClientHandler { + def apply(okhttpClient: OkHttpClient = new OkHttpClient())( + implicit s: Scheduler = Scheduler.Implicits.global) + : OkHttpMonixClientHandler = + new OkHttpMonixClientHandler(okhttpClient)(s) +} + +private[monix] object TaskMonad extends MonadAsyncError[Task] { + override def unit[T](t: T): Task[T] = Task.now(t) + + override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f) + + override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] = + fa.flatMap(f) + + override def async[T]( + register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] = + Task.async { (_, cb) => + register { + case Left(t) => cb(Failure(t)) + case Right(t) => cb(Success(t)) + } + + Cancelable.empty + } + + override def error[T](t: Throwable): Task[T] = Task.raiseError(t) +} diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala index f7c4466..8f21eb0 100644 --- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala +++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala @@ -19,7 +19,7 @@ import okhttp3.{ import okio.{BufferedSink, Okio} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds import scala.util.{Failure, Try} @@ -61,13 +61,14 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) override def contentType(): MediaType = null }) case PathBody(b, _) => Some(OkHttpRequestBody.create(null, b.toFile)) - case StreamBody(s) => None + case StreamBody(s) => streamToRequestBody(s) } } private[okhttp] def readResponse[T]( res: OkHttpResponse, responseAs: ResponseAs[T, S]): R[Response[T]] = { + val body = responseHandler(res).handle(responseAs, responseMonad) val headers = res @@ -87,12 +88,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) case ResponseAsString(encoding) => Try(res.body().source().readString(Charset.forName(encoding))) case ResponseAsByteArray => Try(res.body().bytes()) - case ResponseAsStream() => - Failure(new IllegalStateException("Streaming isn't supported")) + case ras @ ResponseAsStream() => + responseBodyToStream(res).map(ras.responseIsStream) case ResponseAsFile(file, overwrite) => Try(ResponseAs.saveFile(file, res.body().byteStream(), overwrite)) } } + + def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None + + def responseBodyToStream(res: OkHttpResponse): Try[S] = + Failure(new IllegalStateException("Streaming isn't supported")) } class OkHttpSyncClientHandler private (client: OkHttpClient) @@ -108,38 +114,43 @@ class OkHttpSyncClientHandler private (client: OkHttpClient) object OkHttpSyncClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient()) - : SttpHandler[Id, Nothing] = + : OkHttpSyncClientHandler = new OkHttpSyncClientHandler(okhttpClient) } -class OkHttpFutureClientHandler private (client: OkHttpClient)( - implicit ec: ExecutionContext) - extends OkHttpClientHandler[Future, Nothing](client) { - - override def send[T](r: Request[T, Nothing]): Future[Response[T]] = { +abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient, + rm: MonadAsyncError[R]) + extends OkHttpClientHandler[R, S](client) { + override def send[T](r: Request[T, S]): R[Response[T]] = { val request = convertRequest(r) - val promise = Promise[Future[Response[T]]]() - - client - .newCall(request) - .enqueue(new Callback { - override def onFailure(call: Call, e: IOException): Unit = - promise.failure(e) - override def onResponse(call: Call, response: OkHttpResponse): Unit = - try promise.success(readResponse(response, r.responseAs)) - catch { case e: Exception => promise.failure(e) } - }) - - responseMonad.flatten(promise.future) + rm.flatten(rm.async[R[Response[T]]] { cb => + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) + + client + .newCall(request) + .enqueue(new Callback { + override def onFailure(call: Call, e: IOException): Unit = + error(e) + + override def onResponse(call: Call, response: OkHttpResponse): Unit = + try success(readResponse(response, r.responseAs)) + catch { case e: Exception => error(e) } + }) + }) } - override def responseMonad: MonadError[Future] = new FutureMonad + override def responseMonad: MonadError[R] = rm } +class OkHttpFutureClientHandler private (client: OkHttpClient)( + implicit ec: ExecutionContext) + extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {} + object OkHttpFutureClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient())( implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[Future, Nothing] = + : OkHttpFutureClientHandler = new OkHttpFutureClientHandler(okhttpClient) } |