From a6d3e04fd20fde9ecf3de22564c1d9196cae7e40 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Tue, 17 Jan 2017 22:52:11 -0800 Subject: Fix kernel artifact naming and references --- python/toree-gateway-kernel.py | 143 ++++++++++++++++++++++++++++++ python/toree_gateway_kernel.py | 143 ------------------------------ src/main/assembly/distribution.xml | 2 +- src/main/resources/log4j.properties | 2 +- src/main/scala/com/ibm/ToreeGateway.scala | 8 +- 5 files changed, 149 insertions(+), 149 deletions(-) create mode 100644 python/toree-gateway-kernel.py delete mode 100644 python/toree_gateway_kernel.py diff --git a/python/toree-gateway-kernel.py b/python/toree-gateway-kernel.py new file mode 100644 index 0000000..d364407 --- /dev/null +++ b/python/toree-gateway-kernel.py @@ -0,0 +1,143 @@ +# +# (C) Copyright IBM Corp. 2017 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import signal +import sys +import time +import io +import logging + +from os import O_NONBLOCK, read +from fcntl import fcntl, F_GETFL, F_SETFL +from subprocess import Popen, PIPE +from metakernel import MetaKernel +from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import +from py4j.protocol import Py4JError + +class TextOutput(object): + """Wrapper for text output whose repr is the text itself. + This avoids `repr(output)` adding quotation marks around already-rendered text. + """ + def __init__(self, output): + self.output = output + + def __repr__(self): + return self.output + +class ToreeKernel(MetaKernel): + implementation = 'toree_gateway_kernel' + implementation_version = '1.0' + langauge = 'scala' + language_version = '2.11' + banner = "toree_gateway_kernel" + language_info = {'name': 'scala', + 'mimetype': 'application/scala', + 'file_extension': '.scala'} + + + def __init__(self, **kwargs): + super(ToreeKernel, self).__init__(**kwargs) + self._start_toree_client() + + def sig_handler(signum, frame): + self.gateway_proc.terminate() + + def do_shutdown(self, restart): + super(ToreeKernel, self).do_shutdown(restart) + self.gateway_proc.terminate() + + def _start_toree_client(self): + args = [ + "java", + "-classpath", + "/opt/toree-gateway/lib/toree-gateway-1.0-jar-with-dependencies.jar", + "com.ibm.ToreeGatewayClient" + ] + + self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE) + time.sleep(1.5) + self.gateway = JavaGateway( + start_callback_server=True, + callback_server_parameters=CallbackServerParameters()) + + flags = fcntl(self.gateway_proc.stdout, F_GETFL) # get current p.stdout flags + fcntl(self.gateway_proc.stdout, F_SETFL, flags | O_NONBLOCK) + + flags = fcntl(self.gateway_proc.stderr, F_GETFL) # get current p.stdout flags + fcntl(self.gateway_proc.stderr, F_SETFL, flags | O_NONBLOCK) + + signal.signal(signal.SIGTERM, self.sig_handler) + signal.signal(signal.SIGINT, self.sig_handler) + signal.signal(signal.SIGHUP, self.sig_handler) + + + def Error(self, output): + if not output: + return + + super(ToreeKernel, self).Error(output) + + def handle_output(self, fd, fn): + stringIO = io.StringIO() + while True: + try: + b = read(fd.fileno(), 1024) + if b: + stringIO.write(b.decode('utf-8')) + except OSError: + break + + s = stringIO.getvalue() + if s: + fn(s.strip()) + + stringIO.close() + + def do_execute_direct(self, code, silent=False): + """ + :param code: + The code to be executed. + :param silent: + Whether to display output. + :return: + Return value, or None + + MetaKernel code handler. + """ + + if not code.strip(): + return None + + retval = None + try: + retval = self.gateway.entry_point.eval(code.rstrip()) + self.handle_output(self.gateway_proc.stdout, self.Print) + self.handle_output(self.gateway_proc.stderr, self.Error) + except Py4JError as e: + if not silent: + self.Error(format(e)) + + if retval is None: + return + elif isinstance(retval, str): + return TextOutput(retval) + else: + return retval + +if __name__ == '__main__': + ToreeKernel.run_as_main() + diff --git a/python/toree_gateway_kernel.py b/python/toree_gateway_kernel.py deleted file mode 100644 index 46b264a..0000000 --- a/python/toree_gateway_kernel.py +++ /dev/null @@ -1,143 +0,0 @@ -# -# (C) Copyright IBM Corp. 2017 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import signal -import sys -import time -import io -import logging - -from os import O_NONBLOCK, read -from fcntl import fcntl, F_GETFL, F_SETFL -from subprocess import Popen, PIPE -from metakernel import MetaKernel -from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import -from py4j.protocol import Py4JError - -class TextOutput(object): - """Wrapper for text output whose repr is the text itself. - This avoids `repr(output)` adding quotation marks around already-rendered text. - """ - def __init__(self, output): - self.output = output - - def __repr__(self): - return self.output - -class ToreeKernel(MetaKernel): - implementation = 'toree_gateway_kernel' - implementation_version = '1.0' - langauge = 'scala' - language_version = '2.11' - banner = "toree_gateway_kernel" - language_info = {'name': 'scala', - 'mimetype': 'application/scala', - 'file_extension': '.scala'} - - - def __init__(self, **kwargs): - super(ToreeKernel, self).__init__(**kwargs) - self._start_toree_client() - - def sig_handler(signum, frame): - self.gateway_proc.terminate() - - def do_shutdown(self, restart): - super(ToreeKernel, self).do_shutdown(restart) - self.gateway_proc.terminate() - - def _start_toree_client(self): - args = [ - "java", - "-classpath", - "/opt/toree_proxy/lib/toree-gateway-1.0-jar-with-dependencies.jar", - "com.ibm.ToreeGateway" - ] - - self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE) - time.sleep(1.5) - self.gateway = JavaGateway( - start_callback_server=True, - callback_server_parameters=CallbackServerParameters()) - - flags = fcntl(self.gateway_proc.stdout, F_GETFL) # get current p.stdout flags - fcntl(self.gateway_proc.stdout, F_SETFL, flags | O_NONBLOCK) - - flags = fcntl(self.gateway_proc.stderr, F_GETFL) # get current p.stdout flags - fcntl(self.gateway_proc.stderr, F_SETFL, flags | O_NONBLOCK) - - signal.signal(signal.SIGTERM, self.sig_handler) - signal.signal(signal.SIGINT, self.sig_handler) - signal.signal(signal.SIGHUP, self.sig_handler) - - - def Error(self, output): - if not output: - return - - super(ToreeKernel, self).Error(output) - - def handle_output(self, fd, fn): - stringIO = io.StringIO() - while True: - try: - b = read(fd.fileno(), 1024) - if b: - stringIO.write(b.decode('utf-8')) - except OSError: - break - - s = stringIO.getvalue() - if s: - fn(s.strip()) - - stringIO.close() - - def do_execute_direct(self, code, silent=False): - """ - :param code: - The code to be executed. - :param silent: - Whether to display output. - :return: - Return value, or None - - MetaKernel code handler. - """ - - if not code.strip(): - return None - - retval = None - try: - retval = self.gateway.entry_point.eval(code.rstrip()) - self.handle_output(self.gateway_proc.stdout, self.Print) - self.handle_output(self.gateway_proc.stderr, self.Error) - except Py4JError as e: - if not silent: - self.Error(format(e)) - - if retval is None: - return - elif isinstance(retval, str): - return TextOutput(retval) - else: - return retval - -if __name__ == '__main__': - ToreeKernel.run_as_main() - diff --git a/src/main/assembly/distribution.xml b/src/main/assembly/distribution.xml index 3e6146a..e2ecf36 100644 --- a/src/main/assembly/distribution.xml +++ b/src/main/assembly/distribution.xml @@ -46,7 +46,7 @@ python setup.py - toree_gateway_kernel.py + toree-gateway-kernel.py diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index 36cf436..69bdd14 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -19,7 +19,7 @@ log4j.rootLogger=ALL, file, stdout # Direct log messages to a log file log4j.appender.file=org.apache.log4j.RollingFileAppender -log4j.appender.file.File=/opt/toree_proxy/logs/toree-proxy-client.log +log4j.appender.file.File=/opt/toree-gateway/logs/toree-gateway.log log4j.appender.file.MaxFileSize=10MB log4j.appender.file.MaxBackupIndex=10 log4j.appender.file.layout=org.apache.log4j.PatternLayout diff --git a/src/main/scala/com/ibm/ToreeGateway.scala b/src/main/scala/com/ibm/ToreeGateway.scala index a27e0bb..a8f4bdc 100644 --- a/src/main/scala/com/ibm/ToreeGateway.scala +++ b/src/main/scala/com/ibm/ToreeGateway.scala @@ -78,8 +78,8 @@ class ToreeGateway(client: SparkKernelClient) { } catch { case t : Throwable => { - log.info("Error proxying request: " + t.getMessage, t) - promise.success("Error proxying request: " + t.getMessage) + log.info("Error submitting request: " + t.getMessage, t) + promise.success("Error submitting request: " + t.getMessage) } } @@ -92,7 +92,7 @@ object ToreeGatewayClient extends App { final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) def getConfigurationFilePath: String = { - var filePath = "/opt/toree_proxy/conf/profile.json" + var filePath = "/opt/toree-gateway/conf/profile.json" if (args.length == 0) { for (arg <- args) { @@ -124,7 +124,7 @@ object ToreeGatewayClient extends App { with StandardSystemInitialization with StandardHandlerInitialization).createClient() - val toreeGateway = new ToreeGateway(client) + val toreeGateway = new ToreeClientExecutor(client) val gatewayServer: GatewayServer = new GatewayServer(toreeGateway) gatewayServer.start() -- cgit v1.2.3