aboutsummaryrefslogtreecommitdiff
path: root/okhttp-client-handler
diff options
context:
space:
mode:
Diffstat (limited to 'okhttp-client-handler')
-rw-r--r--okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala79
-rw-r--r--okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala61
2 files changed, 115 insertions, 25 deletions
diff --git a/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
new file mode 100644
index 0000000..e792c44
--- /dev/null
+++ b/okhttp-client-handler/monix/src/main/scala/com/softwaremill/sttp/okhttp/monix/OkHttpMonixClientHandler.scala
@@ -0,0 +1,79 @@
+package com.softwaremill.sttp.okhttp.monix
+
+import java.nio.ByteBuffer
+
+import com.softwaremill.sttp._
+import com.softwaremill.sttp.okhttp.OkHttpAsyncClientHandler
+import monix.eval.Task
+import monix.execution.{Cancelable, Scheduler}
+import monix.reactive.{Consumer, Observable}
+import okhttp3.{MediaType, OkHttpClient, RequestBody => OkHttpRequestBody}
+import okio.BufferedSink
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.language.higherKinds
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Created by omainegra on 8/4/17.
+ */
+class OkHttpMonixClientHandler private (client: OkHttpClient)(
+ implicit s: Scheduler)
+ extends OkHttpAsyncClientHandler[Task, Observable[ByteBuffer]](client,
+ TaskMonad) {
+
+ private lazy val io = Scheduler.io("sttp-monix-io")
+
+ override def streamToRequestBody(
+ stream: Observable[ByteBuffer]): Option[OkHttpRequestBody] =
+ Some(new OkHttpRequestBody() {
+ override def writeTo(sink: BufferedSink): Unit = {
+ val f = stream
+ .consumeWith(
+ Consumer.foreach(chunk => sink.write(chunk.array()))
+ )
+ .runAsync(io)
+
+ // We could safely block until the observable is consumed because OkHttp execute
+ // this method asynchronous in another ThreadPool.
+ Await.ready(f, Duration.Inf)
+ }
+
+ override def contentType(): MediaType = null
+ })
+
+ override def responseBodyToStream(
+ res: okhttp3.Response): Try[Observable[ByteBuffer]] =
+ Success(
+ Observable.fromInputStream(res.body().byteStream()).map(ByteBuffer.wrap))
+}
+
+object OkHttpMonixClientHandler {
+ def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
+ implicit s: Scheduler = Scheduler.Implicits.global)
+ : OkHttpMonixClientHandler =
+ new OkHttpMonixClientHandler(okhttpClient)(s)
+}
+
+private[monix] object TaskMonad extends MonadAsyncError[Task] {
+ override def unit[T](t: T): Task[T] = Task.now(t)
+
+ override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
+
+ override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
+ fa.flatMap(f)
+
+ override def async[T](
+ register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
+ Task.async { (_, cb) =>
+ register {
+ case Left(t) => cb(Failure(t))
+ case Right(t) => cb(Success(t))
+ }
+
+ Cancelable.empty
+ }
+
+ override def error[T](t: Throwable): Task[T] = Task.raiseError(t)
+}
diff --git a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
index f7c4466..8f21eb0 100644
--- a/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
+++ b/okhttp-client-handler/src/main/scala/com/softwaremill/sttp/okhttp/OkHttpClientHandler.scala
@@ -19,7 +19,7 @@ import okhttp3.{
import okio.{BufferedSink, Okio}
import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
import scala.util.{Failure, Try}
@@ -61,13 +61,14 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
override def contentType(): MediaType = null
})
case PathBody(b, _) => Some(OkHttpRequestBody.create(null, b.toFile))
- case StreamBody(s) => None
+ case StreamBody(s) => streamToRequestBody(s)
}
}
private[okhttp] def readResponse[T](
res: OkHttpResponse,
responseAs: ResponseAs[T, S]): R[Response[T]] = {
+
val body = responseHandler(res).handle(responseAs, responseMonad)
val headers = res
@@ -87,12 +88,17 @@ abstract class OkHttpClientHandler[R[_], S](client: OkHttpClient)
case ResponseAsString(encoding) =>
Try(res.body().source().readString(Charset.forName(encoding)))
case ResponseAsByteArray => Try(res.body().bytes())
- case ResponseAsStream() =>
- Failure(new IllegalStateException("Streaming isn't supported"))
+ case ras @ ResponseAsStream() =>
+ responseBodyToStream(res).map(ras.responseIsStream)
case ResponseAsFile(file, overwrite) =>
Try(ResponseAs.saveFile(file, res.body().byteStream(), overwrite))
}
}
+
+ def streamToRequestBody(stream: S): Option[OkHttpRequestBody] = None
+
+ def responseBodyToStream(res: OkHttpResponse): Try[S] =
+ Failure(new IllegalStateException("Streaming isn't supported"))
}
class OkHttpSyncClientHandler private (client: OkHttpClient)
@@ -108,38 +114,43 @@ class OkHttpSyncClientHandler private (client: OkHttpClient)
object OkHttpSyncClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())
- : SttpHandler[Id, Nothing] =
+ : OkHttpSyncClientHandler =
new OkHttpSyncClientHandler(okhttpClient)
}
-class OkHttpFutureClientHandler private (client: OkHttpClient)(
- implicit ec: ExecutionContext)
- extends OkHttpClientHandler[Future, Nothing](client) {
-
- override def send[T](r: Request[T, Nothing]): Future[Response[T]] = {
+abstract class OkHttpAsyncClientHandler[R[_], S](client: OkHttpClient,
+ rm: MonadAsyncError[R])
+ extends OkHttpClientHandler[R, S](client) {
+ override def send[T](r: Request[T, S]): R[Response[T]] = {
val request = convertRequest(r)
- val promise = Promise[Future[Response[T]]]()
-
- client
- .newCall(request)
- .enqueue(new Callback {
- override def onFailure(call: Call, e: IOException): Unit =
- promise.failure(e)
- override def onResponse(call: Call, response: OkHttpResponse): Unit =
- try promise.success(readResponse(response, r.responseAs))
- catch { case e: Exception => promise.failure(e) }
- })
-
- responseMonad.flatten(promise.future)
+ rm.flatten(rm.async[R[Response[T]]] { cb =>
+ def success(r: R[Response[T]]) = cb(Right(r))
+ def error(t: Throwable) = cb(Left(t))
+
+ client
+ .newCall(request)
+ .enqueue(new Callback {
+ override def onFailure(call: Call, e: IOException): Unit =
+ error(e)
+
+ override def onResponse(call: Call, response: OkHttpResponse): Unit =
+ try success(readResponse(response, r.responseAs))
+ catch { case e: Exception => error(e) }
+ })
+ })
}
- override def responseMonad: MonadError[Future] = new FutureMonad
+ override def responseMonad: MonadError[R] = rm
}
+class OkHttpFutureClientHandler private (client: OkHttpClient)(
+ implicit ec: ExecutionContext)
+ extends OkHttpAsyncClientHandler[Future, Nothing](client, new FutureMonad) {}
+
object OkHttpFutureClientHandler {
def apply(okhttpClient: OkHttpClient = new OkHttpClient())(
implicit ec: ExecutionContext = ExecutionContext.Implicits.global)
- : SttpHandler[Future, Nothing] =
+ : OkHttpFutureClientHandler =
new OkHttpFutureClientHandler(okhttpClient)
}