From 1a9b0b61e18c9b57fa52cebeb80fb1caddf58186 Mon Sep 17 00:00:00 2001 From: Omar Alejandro Mainegra Sarduy Date: Fri, 4 Aug 2017 18:12:44 -0400 Subject: Refectory: Abstract async handlers over MonadAsynError --- .../sttp/okhttp/OkHttpClientHandler.scala | 55 +++++++++++++--------- 1 file changed, 33 insertions(+), 22 deletions(-) (limited to 'okhttp-client-handler') diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala index f57487f..af896d4 100644 --- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala +++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala @@ -19,7 +19,7 @@ import okhttp3.{ import okio.{BufferedSink, Okio} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds import scala.util.{Failure, Try} @@ -61,13 +61,14 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) }) case PathBody(b) => Some(OkHttpRequestBody.create(null, b.toFile)) case SerializableBody(f, t) => setBody(f(t)) - case StreamBody(s) => None + case StreamBody(s) => streamToRequestBody(s) } } private[okhttp] def readResponse[T]( res: OkHttpResponse, responseAs: ResponseAs[T, S]): R[Response[T]] = { + val body = responseHandler(res).handle(responseAs, responseMonad) val headers = res @@ -87,10 +88,15 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) case ResponseAsString(encoding) => Try(res.body().source().readString(Charset.forName(encoding))) case ResponseAsByteArray => Try(res.body().bytes()) - case ResponseAsStream() => - Failure(new IllegalStateException("Streaming isn't supported")) + case ras @ ResponseAsStream() => + responseBodyToStream(res).map(ras.responseIsStream) } } + + def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None + + def responseBodyToStream(res: OkHttpResponse): Try[S] = + Failure(new IllegalStateException("Streaming isn't supported")) } class OkHttpSyncClientHandler private (client: OkHttpClient) @@ -106,37 +112,42 @@ class OkHttpSyncClientHandler private (client: OkHttpClient) object OkHttpSyncClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient()) - : SttpHandler[Id, Nothing] = + : OkHttpSyncClientHandler = new OkHttpSyncClientHandler(okhttpClient) } -class OkHttpFutureClientHandler private (client: OkHttpClient)( - implicit ec: ExecutionContext) - extends OkHttpClientHandler[Future, Nothing](client) { - - override def send[T](r: Request[T, Nothing]): Future[Response[T]] = { +abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient, + rm: MonadAsyncError[R]) + extends OkHttpClientHandler[R, S](client) { + override def send[T](r: Request[T, S]): R[Response[T]] = { val request = convertRequest(r) - val promise = Promise[Future[Response[T]]]() - client - .newCall(request) - .enqueue(new Callback { - override def onFailure(call: Call, e: IOException): Unit = - promise.failure(e) + rm.flatten(rm.async[R[Response[T]]] { cb => + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) - override def onResponse(call: Call, response: OkHttpResponse): Unit = - promise.success(readResponse(response, r.responseAs)) - }) + client + .newCall(request) + .enqueue(new Callback { + override def onFailure(call: Call, e: IOException): Unit = + error(e) - responseMonad.flatten(promise.future) + override def onResponse(call: Call, response: OkHttpResponse): Unit = + success(readResponse(response, r.responseAs)) + }) + }) } - override def responseMonad: MonadError[Future] = new FutureMonad + override def responseMonad: MonadError[R] = rm } +class OkHttpFutureClientHandler private (client: OkHttpClient)( + implicit ec: ExecutionContext) + extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {} + object OkHttpFutureClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient())( implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[Future, Nothing] = + : OkHttpFutureClientHandler = new OkHttpFutureClientHandler(okhttpClient) } -- cgit v1.2.3