aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuciano Resende <lresende@apache.org>2017-01-14 10:46:21 -0800
committerLuciano Resende <lresende@apache.org>2017-01-14 10:46:21 -0800
commitc7569272092eaceb5789e32a9e2987c09c3c4389 (patch)
tree2aec79e1c2abee5a2212a84e2f2a32f734a193f2
parent4a0c01b124c3fa9495c5eb2d71af4c3f2e2ffe6e (diff)
downloadtoree-gateway-c7569272092eaceb5789e32a9e2987c09c3c4389.tar.gz
toree-gateway-c7569272092eaceb5789e32a9e2987c09c3c4389.tar.bz2
toree-gateway-c7569272092eaceb5789e32a9e2987c09c3c4389.zip
Enhance Toree client invocation handlers
-rw-r--r--src/main/scala/com/ibm/ToreeClient.scala26
1 files changed, 20 insertions, 6 deletions
diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala
index c9dedcf..8fcd059 100644
--- a/src/main/scala/com/ibm/ToreeClient.scala
+++ b/src/main/scala/com/ibm/ToreeClient.scala
@@ -7,24 +7,38 @@ import org.apache.toree.kernel.protocol.v5.MIMEType
import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient
import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution
-import org.apache.toree.kernel.protocol.v5.content.ExecuteResult
+import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult}
import py4j.GatewayServer
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
-
import org.slf4j.LoggerFactory
class ToreeGateway(client: SparkKernelClient) {
- // Define our callback
- def printResult(result: ExecuteResult) = {
+
+ def handleResult(promise:Promise[String], result: ExecuteResult) = {
+ promise.success(result.data.get(MIMEType.PlainText).get)
println(s"Result was: ${result.data.get(MIMEType.PlainText).get}")
}
+ def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = {
+ println(s"Successful code completion")
+ println(s"Result: ${executeReplyOk.evalue.get}")
+ }
+
+ def handleError(promise:Promise[String], reply:ExecuteReplyError) {
+ println(println(s"Error was: ${reply.ename.get}"))
+ }
+
def eval(code: String): Object = {
val promise = Promise[String]
- val exRes: DeferredExecution = client.execute(code).onResult(er => {
- promise.success(er.data.get(MIMEType.PlainText).get)
+ val exRes: DeferredExecution = client.execute(code)
+ .onResult(executeResult => {
+ handleResult(promise, executeResult)
+ }).onError(executeReplyError =>{
+ handleError(promise, executeReplyError)
+ }).onSuccess(executeReplyOk => {
+ handleSuccess(promise, executeReplyOk)
})
Await.result(promise.future, Duration.Inf)