diff options
author | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-04 18:12:44 -0400 |
---|---|---|
committer | Omar Alejandro Mainegra Sarduy <omainegra@gmail.com> | 2017-08-04 18:12:44 -0400 |
commit | 1a9b0b61e18c9b57fa52cebeb80fb1caddf58186 (patch) | |
tree | bb7531004145866a36516b8a8a7984739373db3c | |
parent | 489e591672257d19b3a07198d4d6c9e21be601c7 (diff) | |
download | sttp-1a9b0b61e18c9b57fa52cebeb80fb1caddf58186.tar.gz sttp-1a9b0b61e18c9b57fa52cebeb80fb1caddf58186.tar.bz2 sttp-1a9b0b61e18c9b57fa52cebeb80fb1caddf58186.zip |
Refectory: Abstract async handlers over MonadAsynError
-rw-r--r-- | okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala | 55 |
1 files changed, 33 insertions, 22 deletions
diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala index f57487f..af896d4 100644 --- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala +++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala @@ -19,7 +19,7 @@ import okhttp3.{ import okio.{BufferedSink, Okio} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} import scala.language.higherKinds import scala.util.{Failure, Try} @@ -61,13 +61,14 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) }) case PathBody(b) => Some(OkHttpRequestBody.create(null, b.toFile)) case SerializableBody(f, t) => setBody(f(t)) - case StreamBody(s) => None + case StreamBody(s) => streamToRequestBody(s) } } private[okhttp] def readResponse[T]( res: OkHttpResponse, responseAs: ResponseAs[T, S]): R[Response[T]] = { + val body = responseHandler(res).handle(responseAs, responseMonad) val headers = res @@ -87,10 +88,15 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient) case ResponseAsString(encoding) => Try(res.body().source().readString(Charset.forName(encoding))) case ResponseAsByteArray => Try(res.body().bytes()) - case ResponseAsStream() => - Failure(new IllegalStateException("Streaming isn't supported")) + case ras @ ResponseAsStream() => + responseBodyToStream(res).map(ras.responseIsStream) } } + + def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None + + def responseBodyToStream(res: OkHttpResponse): Try[S] = + Failure(new IllegalStateException("Streaming isn't supported")) } class OkHttpSyncClientHandler private (client: OkHttpClient) @@ -106,37 +112,42 @@ class OkHttpSyncClientHandler private (client: OkHttpClient) object OkHttpSyncClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient()) - : SttpHandler[Id, Nothing] = + : OkHttpSyncClientHandler = new OkHttpSyncClientHandler(okhttpClient) } -class OkHttpFutureClientHandler private (client: OkHttpClient)( - implicit ec: ExecutionContext) - extends OkHttpClientHandler[Future, Nothing](client) { - - override def send[T](r: Request[T, Nothing]): Future[Response[T]] = { +abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient, + rm: MonadAsyncError[R]) + extends OkHttpClientHandler[R, S](client) { + override def send[T](r: Request[T, S]): R[Response[T]] = { val request = convertRequest(r) - val promise = Promise[Future[Response[T]]]() - client - .newCall(request) - .enqueue(new Callback { - override def onFailure(call: Call, e: IOException): Unit = - promise.failure(e) + rm.flatten(rm.async[R[Response[T]]] { cb => + def success(r: R[Response[T]]) = cb(Right(r)) + def error(t: Throwable) = cb(Left(t)) - override def onResponse(call: Call, response: OkHttpResponse): Unit = - promise.success(readResponse(response, r.responseAs)) - }) + client + .newCall(request) + .enqueue(new Callback { + override def onFailure(call: Call, e: IOException): Unit = + error(e) - responseMonad.flatten(promise.future) + override def onResponse(call: Call, response: OkHttpResponse): Unit = + success(readResponse(response, r.responseAs)) + }) + }) } - override def responseMonad: MonadError[Future] = new FutureMonad + override def responseMonad: MonadError[R] = rm } +class OkHttpFutureClientHandler private (client: OkHttpClient)( + implicit ec: ExecutionContext) + extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {} + object OkHttpFutureClientHandler { def apply(okhttpClient: OkHttpClient = new OkHttpClient())( implicit ec: ExecutionContext = ExecutionContext.Implicits.global) - : SttpHandler[Future, Nothing] = + : OkHttpFutureClientHandler = new OkHttpFutureClientHandler(okhttpClient) } |