aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuciano Resende <lresende@apache.org>2017-01-19 12:39:03 -0800
committerLuciano Resende <lresende@apache.org>2017-01-23 11:10:04 -0800
commit78920739e7f5c3e1ee6d6c4632e513aabf108eb2 (patch)
treee083c557ff6a77e78ff56980d2a31a9c5d4d68f1
parentf378b236663cef80a02215714e7b5590fdc8e26b (diff)
downloadtoree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.tar.gz
toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.tar.bz2
toree-gateway-78920739e7f5c3e1ee6d6c4632e513aabf108eb2.zip
Lifecycle management for Toree Gateway
The lifecycle enables starting/stopping the remote Toree instance when a Notebook is started/stopped. This also includes other minor changes to support Python 3.5.x
-rw-r--r--.gitignore7
-rw-r--r--kernel.json2
-rw-r--r--python/config.py82
-rw-r--r--python/lifecycle.py88
-rw-r--r--python/toree_gateway_kernel.py (renamed from python/toree-gateway-kernel.py)49
-rw-r--r--python/toree_manager.py152
-rw-r--r--python/toree_profile.py78
-rw-r--r--src/main/assembly/distribution.xml17
-rw-r--r--src/main/resources/profiles/kernel-1/profile.json (renamed from src/main/resources/profile.json.template)2
-rw-r--r--src/main/resources/profiles/kernel-2/profile.json11
-rw-r--r--src/main/resources/profiles/kernel-3/profile.json11
-rw-r--r--src/main/resources/profiles/mutex15
-rw-r--r--src/main/resources/toree-gateway.properties.template29
-rw-r--r--src/main/scala/org/apache/toree/gateway/ToreeGateway.scala2
14 files changed, 527 insertions, 18 deletions
diff --git a/.gitignore b/.gitignore
index 37175d5..1d6e284 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,11 @@
target
dependency-reduced-pom.xml
+#python files
+*.pyc
+
+
+#Application specific
+*.pid
src/main/resources/profile.json
+src/main/resources/toree-gateway.properties
diff --git a/kernel.json b/kernel.json
index 1a7e342..52e57d9 100644
--- a/kernel.json
+++ b/kernel.json
@@ -5,7 +5,7 @@
"display_name": "Toree Gateway Kernel",
"argv": [
"python",
- "/opt/toree-gateway/python/toree-gateway-kernel.py",
+ "/opt/toree-gateway/python/toree_gateway_kernel.py",
"-f",
"{connection_file}"
],
diff --git a/python/config.py b/python/config.py
new file mode 100644
index 0000000..e7e69ea
--- /dev/null
+++ b/python/config.py
@@ -0,0 +1,82 @@
+#
+# (C) Copyright IBM Corp. 2017
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import os.path
+import configparser
+
+class ConfigManager:
+ config = configparser.RawConfigParser()
+ homePath = os.getcwd()[:-7]
+ configPath = None
+ profilesPath = None
+
+ def __init__(self):
+ if os.environ["TOREE_GATEWAY_HOME"]:
+ self.configPath = os.environ["TOREE_GATEWAY_HOME"] + '/conf'
+ self.profilesPath = os.environ["TOREE_GATEWAY_HOME"] + '/profiles'
+ else:
+ self.configPath = self.homePath + '/src/main/resources/'
+ self.profilesPath = self.homePath + '/src/main/resources/profiles'
+
+ self.config.read(self.configPath + '/toree-gateway.properties')
+
+ def getHomeFolder(self):
+ """
+ Return home folder based on where app is running
+ :return:
+ """
+ return self.homePath
+
+ def getConfigurationFolder(self):
+ """
+ Return the location where configuration file is being read
+ :return:
+ """
+ return self.configPath
+
+ def getProfilesFolder(self):
+ """
+ Return the location where profiles information are is being read
+ :return:
+ """
+ return self.profilesPath
+
+
+ def get(self, key):
+ """
+ Return a configuration from gegeral section
+ :param key: the configuration key
+ :return:
+ """
+ return self.config.get('general', key)
+
+ def getBySection(self, section, key):
+ """
+ Return a configuration from a specific section and key
+ :param section: the configuration section
+ :param key: the configuration key
+ :return:
+ """
+ return self.config.get(section, key)
+
+
+"""
+c = ConfigManager()
+print c.getHomeFolder()
+print c.getConfigurationFolder()
+print c.getProfilesFolder()
+""" \ No newline at end of file
diff --git a/python/lifecycle.py b/python/lifecycle.py
new file mode 100644
index 0000000..39e1638
--- /dev/null
+++ b/python/lifecycle.py
@@ -0,0 +1,88 @@
+#
+# (C) Copyright IBM Corp. 2017
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import os.path
+import time
+
+from config import *
+from toree_manager import *
+
+class LifecycleManager:
+ configManager = None
+ toreeManager = None
+
+ def __init__(self):
+ self.configManager = ConfigManager()
+ self.toreeManager = ToreeManager()
+
+ def _reserve_profile(self):
+ """
+ Tries to reserv a profile configuration to access Toree
+ :return: the reserved profile folder location, or None
+ """
+ profilesFolder = self.configManager.getProfilesFolder()
+ profile = None
+ """Lock mutext to avoid two processes to select same kernel config"""
+ # mutex = open(self.configManager.getProfilesFolder() + "/mutex")
+ # lock = fcntl.flock(mutex, fcntl.LOCK_EX)
+ # print(lock)
+ """Select from the available kernel configurations"""
+ for (path, dirs, files) in os.walk(profilesFolder):
+ for folderName in dirs:
+ profile = Profile(profilesFolder + '/' + folderName)
+ if profile.isAvailable():
+ profile.reserve()
+ break
+ """Unlock the mutex enabling other processes to select same kernel config"""
+ # fcntl.flock(self.mutex, fcntl.LOCK_UN)
+
+ return profile
+
+ def _release_profile(self, profile):
+ profile.release()
+
+ def start_toree(self):
+ """
+ Reserve a profile to use, and start a remote Toree instance with that configuration
+ :return: the path to the configuration file (profile.json) to use when connecting
+ """
+ profile = self._reserve_profile()
+ if profile is None:
+ raise RuntimeError('No server resources available.')
+ self.toreeManager.start_toree(profile)
+
+ return profile
+
+ def stop_toree(self, profile):
+ self.toreeManager.stop_toree(profile)
+ self._release_profile(profile)
+
+
+"""
+manager = LifecycleManager()
+print('Starting first toree kernel')
+p1 = manager.start_toree()
+print(p1.profilePath)
+print('Starting second toree kernel')
+time.sleep(15)
+#p2 = manager.start_toree()
+#print p2.profilePath
+#print'Stopping toree kernels'
+manager.stop_toree(p1)
+#manager.stop_toree(p2)
+print('finished')
+"""
diff --git a/python/toree-gateway-kernel.py b/python/toree_gateway_kernel.py
index 390b904..5791059 100644
--- a/python/toree-gateway-kernel.py
+++ b/python/toree_gateway_kernel.py
@@ -19,7 +19,6 @@ import signal
import sys
import time
import io
-import logging
from os import O_NONBLOCK, read
from fcntl import fcntl, F_GETFL, F_SETFL
@@ -28,6 +27,10 @@ from metakernel import MetaKernel
from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import
from py4j.protocol import Py4JError
+from config import *
+from lifecycle import *
+from toree_profile import *
+
class TextOutput(object):
"""Wrapper for text output whose repr is the text itself.
This avoids `repr(output)` adding quotation marks around already-rendered text.
@@ -38,7 +41,7 @@ class TextOutput(object):
def __repr__(self):
return self.output
-class ToreeKernel(MetaKernel):
+class ToreeGatewayKernel(MetaKernel):
implementation = 'toree_gateway_kernel'
implementation_version = '1.0'
langauge = 'scala'
@@ -48,37 +51,57 @@ class ToreeKernel(MetaKernel):
'mimetype': 'application/scala',
'file_extension': '.scala'}
+ configManager = None
+ toreeLifecycleManager = None
+ toreeProfile = None
def __init__(self, **kwargs):
- super(ToreeKernel, self).__init__(**kwargs)
+ super(ToreeGatewayKernel, self).__init__(**kwargs)
+ self.configManager = ConfigManager()
+ self.toreeLifecycleManager = LifecycleManager()
+ self._start_toree()
+ # pause, to give time to Toree to start at the backend
+ time.sleep(5)
+ # start toree client and connect to backend
self._start_toree_client()
def sig_handler(signum, frame):
self.gateway_proc.terminate()
+ self._stop_toree()
def do_shutdown(self, restart):
- super(ToreeKernel, self).do_shutdown(restart)
+ super(ToreeGatewayKernel, self).do_shutdown(restart)
self.gateway_proc.terminate()
+ self._stop_toree()
+
+ def _start_toree(self):
+ self.toreeProfile = self.toreeLifecycleManager.start_toree()
+
+ def _stop_toree(self):
+ self.toreeLifecycleManager.stop_toree(self.toreeProfile)
+ self.toreeProfile = None
def _start_toree_client(self):
args = [
"java",
"-classpath",
- "/opt/toree-gateway/lib/toree-gateway-1.0-jar-with-dependencies.jar",
- "org.apache.toree.gateway.ToreeGatewayClient"
+ os.environ["TOREE_GATEWAY_HOME"] + "/lib/toree-gateway-1.0-jar-with-dependencies.jar",
+ "org.apache.toree.gateway.ToreeGatewayClient",
+ "--profile",
+ self.toreeProfile.configurationLocation()
]
self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE)
- time.sleep(1.5)
+ time.sleep(2)
self.gateway = JavaGateway(
start_callback_server=True,
callback_server_parameters=CallbackServerParameters())
- flags = fcntl(self.gateway_proc.stdout, F_GETFL) # get current p.stdout flags
- fcntl(self.gateway_proc.stdout, F_SETFL, flags | O_NONBLOCK)
+ #flags = fcntl(self.gateway_proc.stdout, fcntl.F_GETFL) # get current p.stdout flags
+ #fcntl(self.gateway_proc.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- flags = fcntl(self.gateway_proc.stderr, F_GETFL) # get current p.stdout flags
- fcntl(self.gateway_proc.stderr, F_SETFL, flags | O_NONBLOCK)
+ #flags = fcntl(self.gateway_proc.stderr, fcntl.F_GETFL) # get current p.stdout flags
+ #fcntl(self.gateway_proc.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
signal.signal(signal.SIGTERM, self.sig_handler)
signal.signal(signal.SIGINT, self.sig_handler)
@@ -89,7 +112,7 @@ class ToreeKernel(MetaKernel):
if not output:
return
- super(ToreeKernel, self).Error(output)
+ super(ToreeGatewayKernel, self).Error(output)
def handle_output(self, fd, fn):
stringIO = io.StringIO()
@@ -146,5 +169,5 @@ class ToreeKernel(MetaKernel):
return retval
if __name__ == '__main__':
- ToreeKernel.run_as_main()
+ ToreeGatewayKernel.run_as_main()
diff --git a/python/toree_manager.py b/python/toree_manager.py
new file mode 100644
index 0000000..996ad9e
--- /dev/null
+++ b/python/toree_manager.py
@@ -0,0 +1,152 @@
+#
+# (C) Copyright IBM Corp. 2017
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import os.path
+import fcntl
+import time
+import json
+import datetime
+
+import base64
+import paramiko
+from paramiko import SSHClient, SSHException
+from pprint import pprint
+from socket import *
+
+from toree_profile import *
+from config import *
+
+class ToreeManager:
+ configManager = None
+
+ def __init__(self):
+ self.configManager = ConfigManager()
+
+ def getSSHClient(self):
+ client = SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ client.connect( \
+ self.configManager.get('toree.ip'), \
+ username=self.configManager.get('toree.username'), \
+ password=self.configManager.get('toree.password'))
+
+ #print('Client connected successfuly')
+
+ return client
+
+ def getSSHChannel(self, client):
+ channel = client.invoke_shell()
+ channel.settimeout(20)
+ channel.set_combine_stderr(True)
+
+ return channel
+
+ def start_toree(self, profile):
+ try:
+ client = self.getSSHClient()
+ channel = self.getSSHChannel(client)
+
+ channel = client.invoke_shell()
+ channel.settimeout(30)
+ channel.set_combine_stderr(True)
+
+ stdin = channel.makefile('w')
+ stdout = channel.makefile('r')
+ stderr = channel.makefile_stderr('r')
+
+ config = profile.config()
+ command = '''
+ cd {} &&
+ . tkarra.sh --ip {} --stdin-port {} --control-port {} --shell-port {} --iopub-port {} --heartbeat-port {} &&
+ exit
+ '''.format( \
+ self.configManager.get('toree.home') + "/bin", \
+ config['ip'], \
+ config['stdin_port'], \
+ config['control_port'], \
+ config['shell_port'], \
+ config['iopub_port'], \
+ config['hb_port'])
+
+
+ #print(command)
+
+ stdin.write(command)
+
+ pid = None
+ for line in stdout:
+ pid = line.strip()
+
+ profile.updatePid(pid)
+
+ except SSHException as e:
+ pprint(e)
+ except timeout:
+ print('caught a timeout')
+ finally:
+ print('closing connection')
+ if stderr:
+ stderr.close()
+ if stdout:
+ stdout.close()
+ if stdin:
+ stdin.close()
+ if channel:
+ channel.close()
+ if client:
+ client.close()
+ print('all closed')
+
+ def stop_toree(self, profile):
+ try:
+ client = self.getSSHClient()
+ channel = self.getSSHChannel(client)
+
+ stdin = channel.makefile('w')
+ stdout = channel.makefile('r')
+ stderr = channel.makefile_stderr('r')
+
+ command = '''
+ kill -9 {}
+ '''.format(profile.pid())
+
+ print(command)
+
+ stdin.write(command)
+
+ except timeout:
+ print('caught a timeout')
+ finally:
+ print('closing connection')
+ if stdout:
+ stdout.close()
+ if channel:
+ channel.close()
+ if client:
+ client.close()
+ print('all closed')
+
+
+"""
+p = Profile('/Users/lresende/opensource/jupyter/toree-gateway/src/main/resources/profiles/kernel-1')
+t = ToreeManager()
+
+t.start_toree(p)
+t.stop_toree(p)
+""" \ No newline at end of file
diff --git a/python/toree_profile.py b/python/toree_profile.py
new file mode 100644
index 0000000..eaadc9d
--- /dev/null
+++ b/python/toree_profile.py
@@ -0,0 +1,78 @@
+#
+# (C) Copyright IBM Corp. 2017
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import os
+import os.path
+import json
+import time
+
+class Profile:
+ PROCESS_FILE_NAME = "toree.pid"
+ CONFIGURATION_FILE_NAME = "profile.json"
+
+ profilePath = None
+
+ def __init__(self, profilePath):
+ self.profilePath = profilePath
+
+ def pidLocation(self):
+ return self.profilePath + '/' + self.PROCESS_FILE_NAME
+
+ def configurationLocation(self):
+ return self.profilePath + '/' + self.CONFIGURATION_FILE_NAME
+
+ def isAvailable(self):
+ taken = os.path.exists(self.pidLocation())
+ return not taken
+
+ def reserve(self):
+ open(self.pidLocation(), 'a').close()
+
+ def release(self):
+ if os.path.exists(self.pidLocation()):
+ os.remove(self.pidLocation())
+
+ def pid(self):
+ with open(self.pidLocation(), 'r') as pid_file:
+ data = pid_file.read()
+ return data
+
+ def updatePid(self, pid):
+ try:
+ file = open(self.pidLocation(), 'w')
+ file.write(pid)
+ finally:
+ file.close
+
+ def config(self):
+ with open(self.configurationLocation(), 'r') as data_file:
+ return json.load(data_file) #, object_hook=util._byteify
+
+
+
+"""
+p = Profile('/Users/lresende/opensource/jupyter/toree-gateway/src/main/resources/profiles/kernel-1')
+print(p.pidLocation())
+print(p.configurationLocation())
+print(p.isAvailable())
+c = p.config()
+print(c['stdin_port'])
+p.reserve()
+time.sleep(10)
+p.release()
+"""
+
diff --git a/src/main/assembly/distribution.xml b/src/main/assembly/distribution.xml
index 11b3429..6d2bde8 100644
--- a/src/main/assembly/distribution.xml
+++ b/src/main/assembly/distribution.xml
@@ -45,10 +45,23 @@
<directory>python</directory>
<outputDirectory>python</outputDirectory>
<includes>
- <include>setup.py</include>
- <include>toree-gateway-kernel.py</include>
+ <include>*.py</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>src/main/resources</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>*.template</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/profiles</directory>
+ <outputDirectory>profiles</outputDirectory>
+ <excludes>
+ <exclude>*template</exclude>
+ </excludes>
+ </fileSet>
</fileSets>
<files>
diff --git a/src/main/resources/profile.json.template b/src/main/resources/profiles/kernel-1/profile.json
index b473a4d..3d65897 100644
--- a/src/main/resources/profile.json.template
+++ b/src/main/resources/profiles/kernel-1/profile.json
@@ -4,7 +4,7 @@
"hb_port": 48705,
"shell_port": 48703,
"iopub_port": 48704,
- "ip": "9.125.72.72",
+ "ip": "9.30.137.220",
"transport": "tcp",
"signature_scheme": "hmac-sha256",
"key": ""
diff --git a/src/main/resources/profiles/kernel-2/profile.json b/src/main/resources/profiles/kernel-2/profile.json
new file mode 100644
index 0000000..0953896
--- /dev/null
+++ b/src/main/resources/profiles/kernel-2/profile.json
@@ -0,0 +1,11 @@
+{
+ "stdin_port": 48706,
+ "control_port": 48707,
+ "hb_port": 48710,
+ "shell_port": 48708,
+ "iopub_port": 48709,
+ "ip": "9.30.137.220",
+ "transport": "tcp",
+ "signature_scheme": "hmac-sha256",
+ "key": ""
+} \ No newline at end of file
diff --git a/src/main/resources/profiles/kernel-3/profile.json b/src/main/resources/profiles/kernel-3/profile.json
new file mode 100644
index 0000000..72113b1
--- /dev/null
+++ b/src/main/resources/profiles/kernel-3/profile.json
@@ -0,0 +1,11 @@
+{
+ "stdin_port": 48711,
+ "control_port": 48712,
+ "hb_port": 48715,
+ "shell_port": 48713,
+ "iopub_port": 48714,
+ "ip": "9.30.137.220",
+ "transport": "tcp",
+ "signature_scheme": "hmac-sha256",
+ "key": ""
+} \ No newline at end of file
diff --git a/src/main/resources/profiles/mutex b/src/main/resources/profiles/mutex
new file mode 100644
index 0000000..270443d
--- /dev/null
+++ b/src/main/resources/profiles/mutex
@@ -0,0 +1,15 @@
+#
+# (C) Copyright IBM Corp. 2017
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# \ No newline at end of file
diff --git a/src/main/resources/toree-gateway.properties.template b/src/main/resources/toree-gateway.properties.template
new file mode 100644
index 0000000..67be1ed
--- /dev/null
+++ b/src/main/resources/toree-gateway.properties.template
@@ -0,0 +1,29 @@
+#
+# (C) Copyright IBM Corp. 2017
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+[general]
+gateway.home=/opt/toree-gateway
+
+spark.master=spark://9.30.137.220:7077
+#spark.cpus=4
+#spark.memory=1g
+
+toree.ip=9.30.137.220
+toree.username=spark
+toree.password=whoopi
+toree.home=/u/home/SPARK/toree
+
diff --git a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
index 07ba94a..e40050f 100644
--- a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
+++ b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
@@ -95,7 +95,7 @@ object ToreeGatewayClient extends App {
def getConfigurationFilePath: String = {
var filePath = "/opt/toree-gateway/conf/profile.json"
- if (args.length == 0) {
+ if (args.length > 0) {
for (arg <- args) {
if (arg.contains("json")) {
filePath = arg