From c7569272092eaceb5789e32a9e2987c09c3c4389 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Sat, 14 Jan 2017 10:46:21 -0800 Subject: Enhance Toree client invocation handlers --- src/main/scala/com/ibm/ToreeClient.scala | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala index c9dedcf..8fcd059 100644 --- a/src/main/scala/com/ibm/ToreeClient.scala +++ b/src/main/scala/com/ibm/ToreeClient.scala @@ -7,24 +7,38 @@ import org.apache.toree.kernel.protocol.v5.MIMEType import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution -import org.apache.toree.kernel.protocol.v5.content.ExecuteResult +import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult} import py4j.GatewayServer import scala.concurrent.{Await, Promise} import scala.concurrent.duration.Duration - import org.slf4j.LoggerFactory class ToreeGateway(client: SparkKernelClient) { - // Define our callback - def printResult(result: ExecuteResult) = { + + def handleResult(promise:Promise[String], result: ExecuteResult) = { + promise.success(result.data.get(MIMEType.PlainText).get) println(s"Result was: ${result.data.get(MIMEType.PlainText).get}") } + def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = { + println(s"Successful code completion") + println(s"Result: ${executeReplyOk.evalue.get}") + } + + def handleError(promise:Promise[String], reply:ExecuteReplyError) { + println(println(s"Error was: ${reply.ename.get}")) + } + def eval(code: String): Object = { val promise = Promise[String] - val exRes: DeferredExecution = client.execute(code).onResult(er => { - promise.success(er.data.get(MIMEType.PlainText).get) + val exRes: DeferredExecution = client.execute(code) + .onResult(executeResult => { + handleResult(promise, executeResult) + }).onError(executeReplyError =>{ + handleError(promise, executeReplyError) + }).onSuccess(executeReplyOk => { + handleSuccess(promise, executeReplyOk) }) Await.result(promise.future, Duration.Inf) -- cgit v1.2.3