From 4843923e5f84e714bb48fd9b253ecbf5ca6ebf28 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Thu, 26 Jan 2017 16:26:12 -0800 Subject: Implement timeout and recovery for non-success responses Closes #1 --- .../org/apache/toree/gateway/ToreeGateway.scala | 60 ++++++++++++++-------- 1 file changed, 38 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala index 22fbd4a..786a381 100644 --- a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala +++ b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala @@ -27,8 +27,9 @@ import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult, StreamContent} import py4j.GatewayServer -import scala.concurrent.{Await, Promise} -import scala.concurrent.duration.Duration +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import org.slf4j.{Logger, LoggerFactory} import play.api.libs.json._ @@ -46,12 +47,7 @@ class ToreeGateway(client: SparkKernelClient) { } private def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = { - /* - if(! promise.isCompleted) { - log.info(s"Successful code completion") - promise.complete(Try("done")) - } - */ + promise.success("") } private def handleError(promise:Promise[String], reply:ExecuteReplyError) { @@ -65,28 +61,48 @@ class ToreeGateway(client: SparkKernelClient) { promise.success(content.text) } + val ResponseTimeout = 1.seconds + val EvalTimeout = 10.seconds + + private def recoverTimeout[A](future: Future[A], timeout: FiniteDuration, default: A): Future[A] = try { + Await.ready(future, timeout) + } catch { + case ex: TimeoutException => + Future.successful(default) + } + def eval(code: String): Object = { - val promise = Promise[String] - try { - val exRes: DeferredExecution = client.execute(code) + val statusPromise = Promise[String] + val responsePromise = Promise[String] + client.execute(code) .onResult(executeResult => { - handleResult(promise, executeResult) + handleResult(responsePromise, executeResult) }).onError(executeReplyError =>{ - handleError(promise, executeReplyError) - }).onSuccess(executeReplyOk => { - handleSuccess(promise, executeReplyOk) - }).onStream(streamResult => { - handleStream(promise, streamResult) - }) + handleError(statusPromise, executeReplyError) + }).onStream(streamResult => { + handleStream(responsePromise, streamResult) + }).onSuccess(executeReplyOk => { + handleSuccess(statusPromise, executeReplyOk) + }) + + val successFuture: Future[String] = statusPromise.future + val responseFuture: Future[String] = + recoverTimeout(responsePromise.future, ResponseTimeout, "") + + val aggregateFuture: Future[String] = for ( + success <- successFuture; + result <- responseFuture + ) yield { + success + result + } + try { + Await.result(aggregateFuture, EvalTimeout) } catch { case t : Throwable => { - log.info("Error submitting request: " + t.getMessage, t) - promise.success("Error submitting request: " + t.getMessage) + "Error submitting request: " + t.getMessage } } - - Await.result(promise.future, Duration.Inf) } } -- cgit v1.2.3