diff options
author | Luciano Resende <lresende@apache.org> | 2017-01-19 12:39:03 -0800 |
---|---|---|
committer | Luciano Resende <lresende@apache.org> | 2017-01-23 11:10:04 -0800 |
commit | 78920739e7f5c3e1ee6d6c4632e513aabf108eb2 (patch) | |
tree | e083c557ff6a77e78ff56980d2a31a9c5d4d68f1 /python/toree_gateway_kernel.py | |
parent | f378b236663cef80a02215714e7b5590fdc8e26b (diff) | |
download | toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.tar.gz toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.tar.bz2 toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.zip |
Lifecycle management for Toree Gateway
The lifecycle enables starting/stopping the
remote Toree instance when a Notebook is
started/stopped.
This also includes other minor changes to
support Python 3.5.x
Diffstat (limited to 'python/toree_gateway_kernel.py')
-rw-r--r-- | python/toree_gateway_kernel.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/python/toree_gateway_kernel.py b/python/toree_gateway_kernel.py new file mode 100644 index 0000000..5791059 --- /dev/null +++ b/python/toree_gateway_kernel.py @@ -0,0 +1,173 @@ +# +# (C) Copyright IBM Corp. 2017 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import signal +import sys +import time +import io + +from os import O_NONBLOCK, read +from fcntl import fcntl, F_GETFL, F_SETFL +from subprocess import Popen, PIPE +from metakernel import MetaKernel +from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import +from py4j.protocol import Py4JError + +from config import * +from lifecycle import * +from toree_profile import * + +class TextOutput(object): + """Wrapper for text output whose repr is the text itself. + This avoids `repr(output)` adding quotation marks around already-rendered text. + """ + def __init__(self, output): + self.output = output + + def __repr__(self): + return self.output + +class ToreeGatewayKernel(MetaKernel): + implementation = 'toree_gateway_kernel' + implementation_version = '1.0' + langauge = 'scala' + language_version = '2.11' + banner = "toree_gateway_kernel" + language_info = {'name': 'scala', + 'mimetype': 'application/scala', + 'file_extension': '.scala'} + + configManager = None + toreeLifecycleManager = None + toreeProfile = None + + def __init__(self, **kwargs): + super(ToreeGatewayKernel, self).__init__(**kwargs) + self.configManager = ConfigManager() + self.toreeLifecycleManager = LifecycleManager() + self._start_toree() + # pause, to give time to Toree to start at the backend + time.sleep(5) + # start toree client and connect to backend + self._start_toree_client() + + def sig_handler(signum, frame): + self.gateway_proc.terminate() + self._stop_toree() + + def do_shutdown(self, restart): + super(ToreeGatewayKernel, self).do_shutdown(restart) + self.gateway_proc.terminate() + self._stop_toree() + + def _start_toree(self): + self.toreeProfile = self.toreeLifecycleManager.start_toree() + + def _stop_toree(self): + self.toreeLifecycleManager.stop_toree(self.toreeProfile) + self.toreeProfile = None + + def _start_toree_client(self): + args = [ + "java", + "-classpath", + os.environ["TOREE_GATEWAY_HOME"] + "/lib/toree-gateway-1.0-jar-with-dependencies.jar", + "org.apache.toree.gateway.ToreeGatewayClient", + "--profile", + self.toreeProfile.configurationLocation() + ] + + self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE) + time.sleep(2) + self.gateway = JavaGateway( + start_callback_server=True, + callback_server_parameters=CallbackServerParameters()) + + #flags = fcntl(self.gateway_proc.stdout, fcntl.F_GETFL) # get current p.stdout flags + #fcntl(self.gateway_proc.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + #flags = fcntl(self.gateway_proc.stderr, fcntl.F_GETFL) # get current p.stdout flags + #fcntl(self.gateway_proc.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + signal.signal(signal.SIGTERM, self.sig_handler) + signal.signal(signal.SIGINT, self.sig_handler) + signal.signal(signal.SIGHUP, self.sig_handler) + + + def Error(self, output): + if not output: + return + + super(ToreeGatewayKernel, self).Error(output) + + def handle_output(self, fd, fn): + stringIO = io.StringIO() + while True: + try: + b = read(fd.fileno(), 1024) + if b: + stringIO.write(b.decode('utf-8')) + except OSError: + break + + s = stringIO.getvalue() + if s: + fn(s.strip()) + + stringIO.close() + + def do_execute_direct(self, code, silent=False): + """ + :param code: + The code to be executed. + :param silent: + Whether to display output. + :return: + Return value, or None + + MetaKernel code handler. + """ + + if not code.strip(): + return None + + retval = None + try: + retval = self.gateway.entry_point.eval(code.rstrip()) + """ + This would process stdin and stdout, which would generate + garbage on the ui with any log or other related content + on these streams. For now, disabling it, very useful for + debuging purposes. + + self.handle_output(self.gateway_proc.stdout, self.Print) + self.handle_output(self.gateway_proc.stderr, self.Error) + """ + except Py4JError as e: + if not silent: + self.Error(format(e)) + + if retval is None: + return + elif isinstance(retval, str): + return TextOutput(retval) + else: + return retval + +if __name__ == '__main__': + ToreeGatewayKernel.run_as_main() + |