diff options
author | Luciano Resende <lresende@apache.org> | 2017-01-19 12:39:03 -0800 |
---|---|---|
committer | Luciano Resende <lresende@apache.org> | 2017-01-23 11:10:04 -0800 |
commit | 78920739e7f5c3e1ee6d6c4632e513aabf108eb2 (patch) | |
tree | e083c557ff6a77e78ff56980d2a31a9c5d4d68f1 /python | |
parent | f378b236663cef80a02215714e7b5590fdc8e26b (diff) | |
download | toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.tar.gz toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.tar.bz2 toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.zip |
Lifecycle management for Toree Gateway
The lifecycle enables starting/stopping the
remote Toree instance when a Notebook is
started/stopped.
This also includes other minor changes to
support Python 3.5.x
Diffstat (limited to 'python')
-rw-r--r-- | python/config.py | 82 | ||||
-rw-r--r-- | python/lifecycle.py | 88 | ||||
-rw-r--r-- | python/toree_gateway_kernel.py (renamed from python/toree-gateway-kernel.py) | 49 | ||||
-rw-r--r-- | python/toree_manager.py | 152 | ||||
-rw-r--r-- | python/toree_profile.py | 78 |
5 files changed, 436 insertions, 13 deletions
diff --git a/python/config.py b/python/config.py new file mode 100644 index 0000000..e7e69ea --- /dev/null +++ b/python/config.py @@ -0,0 +1,82 @@ +# +# (C) Copyright IBM Corp. 2017 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import os.path +import configparser + +class ConfigManager: + config = configparser.RawConfigParser() + homePath = os.getcwd()[:-7] + configPath = None + profilesPath = None + + def __init__(self): + if os.environ["TOREE_GATEWAY_HOME"]: + self.configPath = os.environ["TOREE_GATEWAY_HOME"] + '/conf' + self.profilesPath = os.environ["TOREE_GATEWAY_HOME"] + '/profiles' + else: + self.configPath = self.homePath + '/src/main/resources/' + self.profilesPath = self.homePath + '/src/main/resources/profiles' + + self.config.read(self.configPath + '/toree-gateway.properties') + + def getHomeFolder(self): + """ + Return home folder based on where app is running + :return: + """ + return self.homePath + + def getConfigurationFolder(self): + """ + Return the location where configuration file is being read + :return: + """ + return self.configPath + + def getProfilesFolder(self): + """ + Return the location where profiles information are is being read + :return: + """ + return self.profilesPath + + + def get(self, key): + """ + Return a configuration from gegeral section + :param key: the configuration key + :return: + """ + return self.config.get('general', key) + + def getBySection(self, section, key): + """ + Return a configuration from a specific section and key + :param section: the configuration section + :param key: the configuration key + :return: + """ + return self.config.get(section, key) + + +""" +c = ConfigManager() +print c.getHomeFolder() +print c.getConfigurationFolder() +print c.getProfilesFolder() +"""
\ No newline at end of file diff --git a/python/lifecycle.py b/python/lifecycle.py new file mode 100644 index 0000000..39e1638 --- /dev/null +++ b/python/lifecycle.py @@ -0,0 +1,88 @@ +# +# (C) Copyright IBM Corp. 2017 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import os.path +import time + +from config import * +from toree_manager import * + +class LifecycleManager: + configManager = None + toreeManager = None + + def __init__(self): + self.configManager = ConfigManager() + self.toreeManager = ToreeManager() + + def _reserve_profile(self): + """ + Tries to reserv a profile configuration to access Toree + :return: the reserved profile folder location, or None + """ + profilesFolder = self.configManager.getProfilesFolder() + profile = None + """Lock mutext to avoid two processes to select same kernel config""" + # mutex = open(self.configManager.getProfilesFolder() + "/mutex") + # lock = fcntl.flock(mutex, fcntl.LOCK_EX) + # print(lock) + """Select from the available kernel configurations""" + for (path, dirs, files) in os.walk(profilesFolder): + for folderName in dirs: + profile = Profile(profilesFolder + '/' + folderName) + if profile.isAvailable(): + profile.reserve() + break + """Unlock the mutex enabling other processes to select same kernel config""" + # fcntl.flock(self.mutex, fcntl.LOCK_UN) + + return profile + + def _release_profile(self, profile): + profile.release() + + def start_toree(self): + """ + Reserve a profile to use, and start a remote Toree instance with that configuration + :return: the path to the configuration file (profile.json) to use when connecting + """ + profile = self._reserve_profile() + if profile is None: + raise RuntimeError('No server resources available.') + self.toreeManager.start_toree(profile) + + return profile + + def stop_toree(self, profile): + self.toreeManager.stop_toree(profile) + self._release_profile(profile) + + +""" +manager = LifecycleManager() +print('Starting first toree kernel') +p1 = manager.start_toree() +print(p1.profilePath) +print('Starting second toree kernel') +time.sleep(15) +#p2 = manager.start_toree() +#print p2.profilePath +#print'Stopping toree kernels' +manager.stop_toree(p1) +#manager.stop_toree(p2) +print('finished') +""" diff --git a/python/toree-gateway-kernel.py b/python/toree_gateway_kernel.py index 390b904..5791059 100644 --- a/python/toree-gateway-kernel.py +++ b/python/toree_gateway_kernel.py @@ -19,7 +19,6 @@ import signal import sys import time import io -import logging from os import O_NONBLOCK, read from fcntl import fcntl, F_GETFL, F_SETFL @@ -28,6 +27,10 @@ from metakernel import MetaKernel from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import from py4j.protocol import Py4JError +from config import * +from lifecycle import * +from toree_profile import * + class TextOutput(object): """Wrapper for text output whose repr is the text itself. This avoids `repr(output)` adding quotation marks around already-rendered text. @@ -38,7 +41,7 @@ class TextOutput(object): def __repr__(self): return self.output -class ToreeKernel(MetaKernel): +class ToreeGatewayKernel(MetaKernel): implementation = 'toree_gateway_kernel' implementation_version = '1.0' langauge = 'scala' @@ -48,37 +51,57 @@ class ToreeKernel(MetaKernel): 'mimetype': 'application/scala', 'file_extension': '.scala'} + configManager = None + toreeLifecycleManager = None + toreeProfile = None def __init__(self, **kwargs): - super(ToreeKernel, self).__init__(**kwargs) + super(ToreeGatewayKernel, self).__init__(**kwargs) + self.configManager = ConfigManager() + self.toreeLifecycleManager = LifecycleManager() + self._start_toree() + # pause, to give time to Toree to start at the backend + time.sleep(5) + # start toree client and connect to backend self._start_toree_client() def sig_handler(signum, frame): self.gateway_proc.terminate() + self._stop_toree() def do_shutdown(self, restart): - super(ToreeKernel, self).do_shutdown(restart) + super(ToreeGatewayKernel, self).do_shutdown(restart) self.gateway_proc.terminate() + self._stop_toree() + + def _start_toree(self): + self.toreeProfile = self.toreeLifecycleManager.start_toree() + + def _stop_toree(self): + self.toreeLifecycleManager.stop_toree(self.toreeProfile) + self.toreeProfile = None def _start_toree_client(self): args = [ "java", "-classpath", - "/opt/toree-gateway/lib/toree-gateway-1.0-jar-with-dependencies.jar", - "org.apache.toree.gateway.ToreeGatewayClient" + os.environ["TOREE_GATEWAY_HOME"] + "/lib/toree-gateway-1.0-jar-with-dependencies.jar", + "org.apache.toree.gateway.ToreeGatewayClient", + "--profile", + self.toreeProfile.configurationLocation() ] self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE) - time.sleep(1.5) + time.sleep(2) self.gateway = JavaGateway( start_callback_server=True, callback_server_parameters=CallbackServerParameters()) - flags = fcntl(self.gateway_proc.stdout, F_GETFL) # get current p.stdout flags - fcntl(self.gateway_proc.stdout, F_SETFL, flags | O_NONBLOCK) + #flags = fcntl(self.gateway_proc.stdout, fcntl.F_GETFL) # get current p.stdout flags + #fcntl(self.gateway_proc.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) - flags = fcntl(self.gateway_proc.stderr, F_GETFL) # get current p.stdout flags - fcntl(self.gateway_proc.stderr, F_SETFL, flags | O_NONBLOCK) + #flags = fcntl(self.gateway_proc.stderr, fcntl.F_GETFL) # get current p.stdout flags + #fcntl(self.gateway_proc.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) signal.signal(signal.SIGTERM, self.sig_handler) signal.signal(signal.SIGINT, self.sig_handler) @@ -89,7 +112,7 @@ class ToreeKernel(MetaKernel): if not output: return - super(ToreeKernel, self).Error(output) + super(ToreeGatewayKernel, self).Error(output) def handle_output(self, fd, fn): stringIO = io.StringIO() @@ -146,5 +169,5 @@ class ToreeKernel(MetaKernel): return retval if __name__ == '__main__': - ToreeKernel.run_as_main() + ToreeGatewayKernel.run_as_main() diff --git a/python/toree_manager.py b/python/toree_manager.py new file mode 100644 index 0000000..996ad9e --- /dev/null +++ b/python/toree_manager.py @@ -0,0 +1,152 @@ +# +# (C) Copyright IBM Corp. 2017 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import os.path +import fcntl +import time +import json +import datetime + +import base64 +import paramiko +from paramiko import SSHClient, SSHException +from pprint import pprint +from socket import * + +from toree_profile import * +from config import * + +class ToreeManager: + configManager = None + + def __init__(self): + self.configManager = ConfigManager() + + def getSSHClient(self): + client = SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + client.connect( \ + self.configManager.get('toree.ip'), \ + username=self.configManager.get('toree.username'), \ + password=self.configManager.get('toree.password')) + + #print('Client connected successfuly') + + return client + + def getSSHChannel(self, client): + channel = client.invoke_shell() + channel.settimeout(20) + channel.set_combine_stderr(True) + + return channel + + def start_toree(self, profile): + try: + client = self.getSSHClient() + channel = self.getSSHChannel(client) + + channel = client.invoke_shell() + channel.settimeout(30) + channel.set_combine_stderr(True) + + stdin = channel.makefile('w') + stdout = channel.makefile('r') + stderr = channel.makefile_stderr('r') + + config = profile.config() + command = ''' + cd {} && + . tkarra.sh --ip {} --stdin-port {} --control-port {} --shell-port {} --iopub-port {} --heartbeat-port {} && + exit + '''.format( \ + self.configManager.get('toree.home') + "/bin", \ + config['ip'], \ + config['stdin_port'], \ + config['control_port'], \ + config['shell_port'], \ + config['iopub_port'], \ + config['hb_port']) + + + #print(command) + + stdin.write(command) + + pid = None + for line in stdout: + pid = line.strip() + + profile.updatePid(pid) + + except SSHException as e: + pprint(e) + except timeout: + print('caught a timeout') + finally: + print('closing connection') + if stderr: + stderr.close() + if stdout: + stdout.close() + if stdin: + stdin.close() + if channel: + channel.close() + if client: + client.close() + print('all closed') + + def stop_toree(self, profile): + try: + client = self.getSSHClient() + channel = self.getSSHChannel(client) + + stdin = channel.makefile('w') + stdout = channel.makefile('r') + stderr = channel.makefile_stderr('r') + + command = ''' + kill -9 {} + '''.format(profile.pid()) + + print(command) + + stdin.write(command) + + except timeout: + print('caught a timeout') + finally: + print('closing connection') + if stdout: + stdout.close() + if channel: + channel.close() + if client: + client.close() + print('all closed') + + +""" +p = Profile('/Users/lresende/opensource/jupyter/toree-gateway/src/main/resources/profiles/kernel-1') +t = ToreeManager() + +t.start_toree(p) +t.stop_toree(p) +"""
\ No newline at end of file diff --git a/python/toree_profile.py b/python/toree_profile.py new file mode 100644 index 0000000..eaadc9d --- /dev/null +++ b/python/toree_profile.py @@ -0,0 +1,78 @@ +# +# (C) Copyright IBM Corp. 2017 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import os +import os.path +import json +import time + +class Profile: + PROCESS_FILE_NAME = "toree.pid" + CONFIGURATION_FILE_NAME = "profile.json" + + profilePath = None + + def __init__(self, profilePath): + self.profilePath = profilePath + + def pidLocation(self): + return self.profilePath + '/' + self.PROCESS_FILE_NAME + + def configurationLocation(self): + return self.profilePath + '/' + self.CONFIGURATION_FILE_NAME + + def isAvailable(self): + taken = os.path.exists(self.pidLocation()) + return not taken + + def reserve(self): + open(self.pidLocation(), 'a').close() + + def release(self): + if os.path.exists(self.pidLocation()): + os.remove(self.pidLocation()) + + def pid(self): + with open(self.pidLocation(), 'r') as pid_file: + data = pid_file.read() + return data + + def updatePid(self, pid): + try: + file = open(self.pidLocation(), 'w') + file.write(pid) + finally: + file.close + + def config(self): + with open(self.configurationLocation(), 'r') as data_file: + return json.load(data_file) #, object_hook=util._byteify + + + +""" +p = Profile('/Users/lresende/opensource/jupyter/toree-gateway/src/main/resources/profiles/kernel-1') +print(p.pidLocation()) +print(p.configurationLocation()) +print(p.isAvailable()) +c = p.config() +print(c['stdin_port']) +p.reserve() +time.sleep(10) +p.release() +""" + |