aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuciano Resende <lresende@apache.org>2017-01-24 16:04:39 -0800
committerLuciano Resende <lresende@apache.org>2017-01-24 16:04:39 -0800
commit1591f422d0e286caa74add358ff6615fbeb9e6bf (patch)
tree328d9f4d1cdb8f37a5cabd89b29893a2c47e1fe9
parentf39ec3fc562a080f41b52e5a7adb87974388adc4 (diff)
downloadtoree-gateway-1591f422d0e286caa74add358ff6615fbeb9e6bf.tar.gz
toree-gateway-1591f422d0e286caa74add358ff6615fbeb9e6bf.tar.bz2
toree-gateway-1591f422d0e286caa74add358ff6615fbeb9e6bf.zip
Enable running multiple notebooks concurrently
There was port conflicts starting py4j when a new notebook instance was started. We now specify the py4j ports to use in profile.json and use those to avoid conflicts.
-rw-r--r--python/toree_gateway_kernel.py11
-rw-r--r--src/main/resources/profiles/kernel-1/profile.json4
-rw-r--r--src/main/resources/profiles/kernel-2/profile.json4
-rw-r--r--src/main/resources/profiles/kernel-3/profile.json4
-rw-r--r--src/main/scala/org/apache/toree/gateway/ToreeGateway.scala11
5 files changed, 27 insertions, 7 deletions
diff --git a/python/toree_gateway_kernel.py b/python/toree_gateway_kernel.py
index e9a3355..c131048 100644
--- a/python/toree_gateway_kernel.py
+++ b/python/toree_gateway_kernel.py
@@ -24,7 +24,7 @@ from os import O_NONBLOCK, read
from fcntl import fcntl, F_GETFL, F_SETFL
from subprocess import Popen, PIPE
from metakernel import MetaKernel
-from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import
+from py4j.java_gateway import JavaGateway, GatewayParameters, CallbackServerParameters, java_import
from py4j.protocol import Py4JError
from config import *
@@ -96,10 +96,15 @@ class ToreeGatewayKernel(MetaKernel):
]
self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE)
- time.sleep(2)
+ time.sleep(5)
+
+ config = self.toreeProfile.config()
+ print('Creating py4j gateway using port:{} '.format(config['py4j_java']))
+
self.gateway = JavaGateway(
+ gateway_parameters=GatewayParameters(port=config['py4j_java']),
start_callback_server=True,
- callback_server_parameters=CallbackServerParameters())
+ callback_server_parameters=CallbackServerParameters(port=config['py4j_python']))
#flags = fcntl(self.gateway_proc.stdout, fcntl.F_GETFL) # get current p.stdout flags
#fcntl(self.gateway_proc.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
diff --git a/src/main/resources/profiles/kernel-1/profile.json b/src/main/resources/profiles/kernel-1/profile.json
index 3d65897..1891b65 100644
--- a/src/main/resources/profiles/kernel-1/profile.json
+++ b/src/main/resources/profiles/kernel-1/profile.json
@@ -7,5 +7,7 @@
"ip": "9.30.137.220",
"transport": "tcp",
"signature_scheme": "hmac-sha256",
- "key": ""
+ "key": "",
+ "py4j_java": 25333,
+ "py4j_python": 25334
} \ No newline at end of file
diff --git a/src/main/resources/profiles/kernel-2/profile.json b/src/main/resources/profiles/kernel-2/profile.json
index 0953896..330156b 100644
--- a/src/main/resources/profiles/kernel-2/profile.json
+++ b/src/main/resources/profiles/kernel-2/profile.json
@@ -7,5 +7,7 @@
"ip": "9.30.137.220",
"transport": "tcp",
"signature_scheme": "hmac-sha256",
- "key": ""
+ "key": "",
+ "py4j_java": 25335,
+ "py4j_python": 25336
} \ No newline at end of file
diff --git a/src/main/resources/profiles/kernel-3/profile.json b/src/main/resources/profiles/kernel-3/profile.json
index 72113b1..1fa422c 100644
--- a/src/main/resources/profiles/kernel-3/profile.json
+++ b/src/main/resources/profiles/kernel-3/profile.json
@@ -7,5 +7,7 @@
"ip": "9.30.137.220",
"transport": "tcp",
"signature_scheme": "hmac-sha256",
- "key": ""
+ "key": "",
+ "py4j_java": 25337,
+ "py4j_python": 25338
} \ No newline at end of file
diff --git a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
index e40050f..22fbd4a 100644
--- a/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
+++ b/src/main/scala/org/apache/toree/gateway/ToreeGateway.scala
@@ -31,6 +31,8 @@ import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
import org.slf4j.{Logger, LoggerFactory}
+import play.api.libs.json._
+
import scala.util.Try
class ToreeGateway(client: SparkKernelClient) {
@@ -124,6 +126,13 @@ object ToreeGatewayClient extends App {
val toreeGateway = new ToreeGateway(client)
- val gatewayServer: GatewayServer = new GatewayServer(toreeGateway)
+ val jsonValue = Json.parse(configFileContent)
+ val port = (jsonValue \ "py4j_java").as[Int]
+
+ if(log.isDebugEnabled()) {
+ log.debug(">>> Starting GatewayServer with port " + port)
+ }
+
+ val gatewayServer: GatewayServer = new GatewayServer(toreeGateway, port)
gatewayServer.start()
}