aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/com/ibm/ToreeClient.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/com/ibm/ToreeClient.scala')
-rw-r--r--src/main/scala/com/ibm/ToreeClient.scala26
1 files changed, 20 insertions, 6 deletions
diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala
index c9dedcf..8fcd059 100644
--- a/src/main/scala/com/ibm/ToreeClient.scala
+++ b/src/main/scala/com/ibm/ToreeClient.scala
@@ -7,24 +7,38 @@ import org.apache.toree.kernel.protocol.v5.MIMEType
import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient
import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution
-import org.apache.toree.kernel.protocol.v5.content.ExecuteResult
+import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult}
import py4j.GatewayServer
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
-
import org.slf4j.LoggerFactory
class ToreeGateway(client: SparkKernelClient) {
- // Define our callback
- def printResult(result: ExecuteResult) = {
+
+ def handleResult(promise:Promise[String], result: ExecuteResult) = {
+ promise.success(result.data.get(MIMEType.PlainText).get)
println(s"Result was: ${result.data.get(MIMEType.PlainText).get}")
}
+ def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = {
+ println(s"Successful code completion")
+ println(s"Result: ${executeReplyOk.evalue.get}")
+ }
+
+ def handleError(promise:Promise[String], reply:ExecuteReplyError) {
+ println(println(s"Error was: ${reply.ename.get}"))
+ }
+
def eval(code: String): Object = {
val promise = Promise[String]
- val exRes: DeferredExecution = client.execute(code).onResult(er => {
- promise.success(er.data.get(MIMEType.PlainText).get)
+ val exRes: DeferredExecution = client.execute(code)
+ .onResult(executeResult => {
+ handleResult(promise, executeResult)
+ }).onError(executeReplyError =>{
+ handleError(promise, executeReplyError)
+ }).onSuccess(executeReplyOk => {
+ handleSuccess(promise, executeReplyOk)
})
Await.result(promise.future, Duration.Inf)