diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/scala/com/ibm/ToreeClient.scala | 48 |
1 files changed, 33 insertions, 15 deletions
diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala index 8fcd059..b9e58b0 100644 --- a/src/main/scala/com/ibm/ToreeClient.scala +++ b/src/main/scala/com/ibm/ToreeClient.scala @@ -1,5 +1,6 @@ package com.ibm + import com.typesafe.config.ConfigFactory import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap import com.typesafe.config.Config @@ -7,39 +8,56 @@ import org.apache.toree.kernel.protocol.v5.MIMEType import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution -import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult} +import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult, StreamContent} import py4j.GatewayServer import scala.concurrent.{Await, Promise} import scala.concurrent.duration.Duration -import org.slf4j.LoggerFactory +import org.slf4j.{Logger, LoggerFactory} + +import scala.util.Try class ToreeGateway(client: SparkKernelClient) { + final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) - def handleResult(promise:Promise[String], result: ExecuteResult) = { - promise.success(result.data.get(MIMEType.PlainText).get) - println(s"Result was: ${result.data.get(MIMEType.PlainText).get}") + private def handleResult(promise:Promise[String], result: ExecuteResult) = { + // promise.success(result.data(MIMEType.PlainText)) + promise.success(result.content) + log.warn(s"Result was: ${result.data(MIMEType.PlainText)}") } - def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = { - println(s"Successful code completion") - println(s"Result: ${executeReplyOk.evalue.get}") + private def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = { + log.warn(s"Successful code completion") + promise.complete(Try("done")) } - def handleError(promise:Promise[String], reply:ExecuteReplyError) { - println(println(s"Error was: ${reply.ename.get}")) + private def handleError(promise:Promise[String], reply:ExecuteReplyError) { + log.warn(s"Error was: ${reply.ename.get}") + promise.failure(new Throwable("Error evaluating paragraph: " + reply.content)) + } + + private def handleStream(promise:Promise[String], content: StreamContent) { + log.warn(s"Received streaming content ${content.name} was: ${content.text}") + promise.success(content.text) } def eval(code: String): Object = { val promise = Promise[String] - val exRes: DeferredExecution = client.execute(code) + try { + val exRes: DeferredExecution = client.execute(code) .onResult(executeResult => { handleResult(promise, executeResult) }).onError(executeReplyError =>{ - handleError(promise, executeReplyError) - }).onSuccess(executeReplyOk => { - handleSuccess(promise, executeReplyOk) - }) + handleError(promise, executeReplyError) + }).onSuccess(executeReplyOk => { + handleSuccess(promise, executeReplyOk) + }).onStream(streamResult => { + handleStream(promise, streamResult) + }) + + } catch { + case t : Throwable => log.info("Error proxying request: " + t.getMessage, t) + } Await.result(promise.future, Duration.Inf) } |