aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuciano Resende <lresende@apache.org>2017-01-17 09:52:48 -0800
committerLuciano Resende <lresende@apache.org>2017-01-17 09:52:48 -0800
commit94a52b4e2564cca7bbf581617e44e764ebdd49d0 (patch)
treec5d460ad1bf73031f92075cefbf954cb1432e174
parentdef1f0416fb5ed5753c426deb03a674207cd76c2 (diff)
downloadtoree-gateway-94a52b4e2564cca7bbf581617e44e764ebdd49d0.tar.gz
toree-gateway-94a52b4e2564cca7bbf581617e44e764ebdd49d0.tar.bz2
toree-gateway-94a52b4e2564cca7bbf581617e44e764ebdd49d0.zip
Propagate errors as success with error details
-rw-r--r--python/toree_kernel.py8
-rw-r--r--src/main/scala/com/ibm/ToreeClient.scala11
2 files changed, 13 insertions, 6 deletions
diff --git a/python/toree_kernel.py b/python/toree_kernel.py
index e73d916..4b6266e 100644
--- a/python/toree_kernel.py
+++ b/python/toree_kernel.py
@@ -19,6 +19,7 @@ import signal
import sys
import time
import io
+import logging
from os import O_NONBLOCK, read
from fcntl import fcntl, F_GETFL, F_SETFL
@@ -70,8 +71,8 @@ class ToreeKernel(MetaKernel):
self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE)
time.sleep(1.5)
self.gateway = JavaGateway(
- start_callback_server=True,
- callback_server_parameters=CallbackServerParameters())
+ start_callback_server=True,
+ callback_server_parameters=CallbackServerParameters())
flags = fcntl(self.gateway_proc.stdout, F_GETFL) # get current p.stdout flags
fcntl(self.gateway_proc.stdout, F_SETFL, flags | O_NONBLOCK)
@@ -128,7 +129,7 @@ class ToreeKernel(MetaKernel):
self.handle_output(self.gateway_proc.stderr, self.Error)
except Py4JError as e:
if not silent:
- self.Error(e.cause)
+ self.Error(format(e))
if retval is None:
return
@@ -139,3 +140,4 @@ class ToreeKernel(MetaKernel):
if __name__ == '__main__':
ToreeKernel.run_as_main()
+
diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala
index d3714d6..d21d419 100644
--- a/src/main/scala/com/ibm/ToreeClient.scala
+++ b/src/main/scala/com/ibm/ToreeClient.scala
@@ -38,9 +38,10 @@ class ToreeGateway(client: SparkKernelClient) {
final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
private def handleResult(promise:Promise[String], result: ExecuteResult) = {
+ log.warn(s"Result was: ${result.data(MIMEType.PlainText)}")
// promise.success(result.data(MIMEType.PlainText))
promise.success(result.content)
- log.warn(s"Result was: ${result.data(MIMEType.PlainText)}")
+
}
private def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = {
@@ -50,7 +51,8 @@ class ToreeGateway(client: SparkKernelClient) {
private def handleError(promise:Promise[String], reply:ExecuteReplyError) {
log.warn(s"Error was: ${reply.ename.get}")
- promise.failure(new Throwable("Error evaluating paragraph: " + reply.content))
+ //promise.failure(new Throwable("Error evaluating paragraph: " + reply.content))
+ promise.success(reply.status + ":" + reply.ename.getOrElse("") + " - " + reply.evalue.getOrElse(""))
}
private def handleStream(promise:Promise[String], content: StreamContent) {
@@ -73,7 +75,10 @@ class ToreeGateway(client: SparkKernelClient) {
})
} catch {
- case t : Throwable => log.info("Error proxying request: " + t.getMessage, t)
+ case t : Throwable => {
+ log.info("Error proxying request: " + t.getMessage, t)
+ promise.success("Error proxying request: " + t.getMessage)
+ }
}
Await.result(promise.future, Duration.Inf)