aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuciano Resende <lresende@apache.org>2017-01-15 09:49:44 -0800
committerLuciano Resende <lresende@apache.org>2017-01-15 09:49:44 -0800
commit5e29436f55b20324c272cd863ceb8263c0c0ee9a (patch)
tree0ea307e607aa3a98e9be60aa2cdc4561643e1bac
parentc7569272092eaceb5789e32a9e2987c09c3c4389 (diff)
downloadtoree-gateway-5e29436f55b20324c272cd863ceb8263c0c0ee9a.tar.gz
toree-gateway-5e29436f55b20324c272cd863ceb8263c0c0ee9a.tar.bz2
toree-gateway-5e29436f55b20324c272cd863ceb8263c0c0ee9a.zip
Properly display output results to notebook ui
-rw-r--r--src/main/scala/com/ibm/ToreeClient.scala48
1 files changed, 33 insertions, 15 deletions
diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala
index 8fcd059..b9e58b0 100644
--- a/src/main/scala/com/ibm/ToreeClient.scala
+++ b/src/main/scala/com/ibm/ToreeClient.scala
@@ -1,5 +1,6 @@
package com.ibm
+
import com.typesafe.config.ConfigFactory
import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap
import com.typesafe.config.Config
@@ -7,39 +8,56 @@ import org.apache.toree.kernel.protocol.v5.MIMEType
import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient
import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution
-import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult}
+import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult, StreamContent}
import py4j.GatewayServer
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.util.Try
class ToreeGateway(client: SparkKernelClient) {
+ final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
- def handleResult(promise:Promise[String], result: ExecuteResult) = {
- promise.success(result.data.get(MIMEType.PlainText).get)
- println(s"Result was: ${result.data.get(MIMEType.PlainText).get}")
+ private def handleResult(promise:Promise[String], result: ExecuteResult) = {
+ // promise.success(result.data(MIMEType.PlainText))
+ promise.success(result.content)
+ log.warn(s"Result was: ${result.data(MIMEType.PlainText)}")
}
- def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = {
- println(s"Successful code completion")
- println(s"Result: ${executeReplyOk.evalue.get}")
+ private def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = {
+ log.warn(s"Successful code completion")
+ promise.complete(Try("done"))
}
- def handleError(promise:Promise[String], reply:ExecuteReplyError) {
- println(println(s"Error was: ${reply.ename.get}"))
+ private def handleError(promise:Promise[String], reply:ExecuteReplyError) {
+ log.warn(s"Error was: ${reply.ename.get}")
+ promise.failure(new Throwable("Error evaluating paragraph: " + reply.content))
+ }
+
+ private def handleStream(promise:Promise[String], content: StreamContent) {
+ log.warn(s"Received streaming content ${content.name} was: ${content.text}")
+ promise.success(content.text)
}
def eval(code: String): Object = {
val promise = Promise[String]
- val exRes: DeferredExecution = client.execute(code)
+ try {
+ val exRes: DeferredExecution = client.execute(code)
.onResult(executeResult => {
handleResult(promise, executeResult)
}).onError(executeReplyError =>{
- handleError(promise, executeReplyError)
- }).onSuccess(executeReplyOk => {
- handleSuccess(promise, executeReplyOk)
- })
+ handleError(promise, executeReplyError)
+ }).onSuccess(executeReplyOk => {
+ handleSuccess(promise, executeReplyOk)
+ }).onStream(streamResult => {
+ handleStream(promise, streamResult)
+ })
+
+ } catch {
+ case t : Throwable => log.info("Error proxying request: " + t.getMessage, t)
+ }
Await.result(promise.future, Duration.Inf)
}