aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-01-26 16:26:12 -0800
committerLuciano Resende <lresende@apache.org>2017-01-26 17:46:02 -0800
commit4843923e5f84e714bb48fd9b253ecbf5ca6ebf28 (patch)
tree45983106a0c10fda437e6c7f6451d1c7c08a972e
parent6573a24769b9c4fd1da55f460272a5d1ce0c6150 (diff)
downloadtoree-gateway-4843923e5f84e714bb48fd9b253ecbf5ca6ebf28.tar.gz
toree-gateway-4843923e5f84e714bb48fd9b253ecbf5ca6ebf28.tar.bz2
toree-gateway-4843923e5f84e714bb48fd9b253ecbf5ca6ebf28.zip
Implement timeout and recovery for non-success responses
Closes #1
-rw-r--r--src/main/scala/org/apache/toree/gateway/ToreeGateway.scala60
1 files changed, 38 insertions, 22 deletions
diff --git a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
index 22fbd4a..786a381 100644
--- a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
+++ b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
@@ -27,8 +27,9 @@ import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution
import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult, StreamContent}
import py4j.GatewayServer
-import scala.concurrent.{Await, Promise}
-import scala.concurrent.duration.Duration
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json._
@@ -46,12 +47,7 @@ class ToreeGateway(client: SparkKernelClient) {
}
private def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = {
- /*
- if(! promise.isCompleted) {
- log.info(s"Successful code completion")
- promise.complete(Try("done"))
- }
- */
+ promise.success("")
}
private def handleError(promise:Promise[String], reply:ExecuteReplyError) {
@@ -65,28 +61,48 @@ class ToreeGateway(client: SparkKernelClient) {
promise.success(content.text)
}
+ val ResponseTimeout = 1.seconds
+ val EvalTimeout = 10.seconds
+
+ private def recoverTimeout[A](future: Future[A], timeout: FiniteDuration, default: A): Future[A] = try {
+ Await.ready(future, timeout)
+ } catch {
+ case ex: TimeoutException =>
+ Future.successful(default)
+ }
+
def eval(code: String): Object = {
- val promise = Promise[String]
- try {
- val exRes: DeferredExecution = client.execute(code)
+ val statusPromise = Promise[String]
+ val responsePromise = Promise[String]
+ client.execute(code)
.onResult(executeResult => {
- handleResult(promise, executeResult)
+ handleResult(responsePromise, executeResult)
}).onError(executeReplyError =>{
- handleError(promise, executeReplyError)
- }).onSuccess(executeReplyOk => {
- handleSuccess(promise, executeReplyOk)
- }).onStream(streamResult => {
- handleStream(promise, streamResult)
- })
+ handleError(statusPromise, executeReplyError)
+ }).onStream(streamResult => {
+ handleStream(responsePromise, streamResult)
+ }).onSuccess(executeReplyOk => {
+ handleSuccess(statusPromise, executeReplyOk)
+ })
+
+ val successFuture: Future[String] = statusPromise.future
+ val responseFuture: Future[String] =
+ recoverTimeout(responsePromise.future, ResponseTimeout, "")
+
+ val aggregateFuture: Future[String] = for (
+ success <- successFuture;
+ result <- responseFuture
+ ) yield {
+ success + result
+ }
+ try {
+ Await.result(aggregateFuture, EvalTimeout)
} catch {
case t : Throwable => {
- log.info("Error submitting request: " + t.getMessage, t)
- promise.success("Error submitting request: " + t.getMessage)
+ "Error submitting request: " + t.getMessage
}
}
-
- Await.result(promise.future, Duration.Inf)
}
}