diff options
author | Luciano Resende <lresende@apache.org> | 2017-01-14 10:46:21 -0800 |
---|---|---|
committer | Luciano Resende <lresende@apache.org> | 2017-01-14 10:46:21 -0800 |
commit | c7569272092eaceb5789e32a9e2987c09c3c4389 (patch) | |
tree | 2aec79e1c2abee5a2212a84e2f2a32f734a193f2 | |
parent | 4a0c01b124c3fa9495c5eb2d71af4c3f2e2ffe6e (diff) | |
download | toree-gateway-c7569272092eaceb5789e32a9e2987c09c3c4389.tar.gz toree-gateway-c7569272092eaceb5789e32a9e2987c09c3c4389.tar.bz2 toree-gateway-c7569272092eaceb5789e32a9e2987c09c3c4389.zip |
Enhance Toree client invocation handlers
-rw-r--r-- | src/main/scala/com/ibm/ToreeClient.scala | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala index c9dedcf..8fcd059 100644 --- a/src/main/scala/com/ibm/ToreeClient.scala +++ b/src/main/scala/com/ibm/ToreeClient.scala @@ -7,24 +7,38 @@ import org.apache.toree.kernel.protocol.v5.MIMEType import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution -import org.apache.toree.kernel.protocol.v5.content.ExecuteResult +import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult} import py4j.GatewayServer import scala.concurrent.{Await, Promise} import scala.concurrent.duration.Duration - import org.slf4j.LoggerFactory class ToreeGateway(client: SparkKernelClient) { - // Define our callback - def printResult(result: ExecuteResult) = { + + def handleResult(promise:Promise[String], result: ExecuteResult) = { + promise.success(result.data.get(MIMEType.PlainText).get) println(s"Result was: ${result.data.get(MIMEType.PlainText).get}") } + def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = { + println(s"Successful code completion") + println(s"Result: ${executeReplyOk.evalue.get}") + } + + def handleError(promise:Promise[String], reply:ExecuteReplyError) { + println(println(s"Error was: ${reply.ename.get}")) + } + def eval(code: String): Object = { val promise = Promise[String] - val exRes: DeferredExecution = client.execute(code).onResult(er => { - promise.success(er.data.get(MIMEType.PlainText).get) + val exRes: DeferredExecution = client.execute(code) + .onResult(executeResult => { + handleResult(promise, executeResult) + }).onError(executeReplyError =>{ + handleError(promise, executeReplyError) + }).onSuccess(executeReplyOk => { + handleSuccess(promise, executeReplyOk) }) Await.result(promise.future, Duration.Inf) |