From 87fe76dcd9c92c336f65cff3b9511d01f73e7421 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Tue, 17 Jan 2017 22:11:21 -0800 Subject: Use consistent naming pattern around the project --- kernel.json | 4 +- pom.xml | 35 +++---- python/setup.py | 10 +- python/toree_gateway_kernel.py | 143 ++++++++++++++++++++++++++ python/toree_kernel.py | 143 -------------------------- src/main/scala/com/ibm/ToreeClient.scala | 131 ----------------------- src/main/scala/com/ibm/ToreeGateway.scala | 131 +++++++++++++++++++++++ src/test/scala/com/ibm/ToreeClientSpec.scala | 104 ------------------- src/test/scala/com/ibm/ToreeGatewaySpec.scala | 104 +++++++++++++++++++ 9 files changed, 402 insertions(+), 403 deletions(-) create mode 100644 python/toree_gateway_kernel.py delete mode 100644 python/toree_kernel.py delete mode 100644 src/main/scala/com/ibm/ToreeClient.scala create mode 100644 src/main/scala/com/ibm/ToreeGateway.scala delete mode 100644 src/test/scala/com/ibm/ToreeClientSpec.scala create mode 100644 src/test/scala/com/ibm/ToreeGatewaySpec.scala diff --git a/kernel.json b/kernel.json index 5169f26..ab8bb74 100644 --- a/kernel.json +++ b/kernel.json @@ -2,10 +2,10 @@ "language_info": { "name": "scala" }, - "display_name": "Toree Client Proxy Kernel", + "display_name": "Toree Gateway Kernel", "argv": [ "python", - "/opt/toree_proxy/python/toree_proxy_kernel.py", + "/opt/toree_gateway/python/toree_gateway_kernel.py", "-f", "{connection_file}" ], diff --git a/pom.xml b/pom.xml index 7fb8db4..aca308e 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 4.0.0 com.ibm toree-gateway - 0.1 + 1.0 Toree Gateway @@ -66,16 +66,12 @@ org.apache.maven.plugins - maven-surefire-plugin - 2.19.1 + maven-compiler-plugin + 3.3 - -Dlog4j.configuration=file:"./src/test/resources/conf/log4j.prop" - + 1.8 + 1.8 - - - org.apache.maven.plugins - maven-compiler-plugin compile @@ -85,15 +81,7 @@ - - org.apache.maven.plugins - maven-compiler-plugin - 3.3 - - 1.8 - 1.8 - - + org.apache.maven.plugins maven-shade-plugin @@ -126,6 +114,17 @@ + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + -Dlog4j.configuration=file:"./src/test/resources/conf/log4j.prop" + + + + net.alchim31.maven scala-maven-plugin diff --git a/python/setup.py b/python/setup.py index 2ebbaa9..a46d087 100644 --- a/python/setup.py +++ b/python/setup.py @@ -17,15 +17,15 @@ from setuptools import setup -setup(name='toree_proxy_kernel', +setup(name='toree_gateway_kernel', version='0.1', - description='Toree Client Proxy Kernel', - long_description='A simple echo kernel for Jupyter/IPython, based on MetaKernel', - py_modules=['toree_proxy_kernel'], + description='Toree Gateway', + long_description='A Gateway Kernel for Apache Toree, based on MetaKernel', + py_modules=['toree_gateway_kernel'], install_requires=['metakernel', 'py4j'], classifiers = [ 'Framework :: IPython', - 'License :: OSI Approved :: BSD License', + 'License :: OSI Approved :: Apache License 2.0', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 2', 'Topic :: System :: Shells', diff --git a/python/toree_gateway_kernel.py b/python/toree_gateway_kernel.py new file mode 100644 index 0000000..46b264a --- /dev/null +++ b/python/toree_gateway_kernel.py @@ -0,0 +1,143 @@ +# +# (C) Copyright IBM Corp. 2017 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import signal +import sys +import time +import io +import logging + +from os import O_NONBLOCK, read +from fcntl import fcntl, F_GETFL, F_SETFL +from subprocess import Popen, PIPE +from metakernel import MetaKernel +from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import +from py4j.protocol import Py4JError + +class TextOutput(object): + """Wrapper for text output whose repr is the text itself. + This avoids `repr(output)` adding quotation marks around already-rendered text. + """ + def __init__(self, output): + self.output = output + + def __repr__(self): + return self.output + +class ToreeKernel(MetaKernel): + implementation = 'toree_gateway_kernel' + implementation_version = '1.0' + langauge = 'scala' + language_version = '2.11' + banner = "toree_gateway_kernel" + language_info = {'name': 'scala', + 'mimetype': 'application/scala', + 'file_extension': '.scala'} + + + def __init__(self, **kwargs): + super(ToreeKernel, self).__init__(**kwargs) + self._start_toree_client() + + def sig_handler(signum, frame): + self.gateway_proc.terminate() + + def do_shutdown(self, restart): + super(ToreeKernel, self).do_shutdown(restart) + self.gateway_proc.terminate() + + def _start_toree_client(self): + args = [ + "java", + "-classpath", + "/opt/toree_proxy/lib/toree-gateway-1.0-jar-with-dependencies.jar", + "com.ibm.ToreeGateway" + ] + + self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE) + time.sleep(1.5) + self.gateway = JavaGateway( + start_callback_server=True, + callback_server_parameters=CallbackServerParameters()) + + flags = fcntl(self.gateway_proc.stdout, F_GETFL) # get current p.stdout flags + fcntl(self.gateway_proc.stdout, F_SETFL, flags | O_NONBLOCK) + + flags = fcntl(self.gateway_proc.stderr, F_GETFL) # get current p.stdout flags + fcntl(self.gateway_proc.stderr, F_SETFL, flags | O_NONBLOCK) + + signal.signal(signal.SIGTERM, self.sig_handler) + signal.signal(signal.SIGINT, self.sig_handler) + signal.signal(signal.SIGHUP, self.sig_handler) + + + def Error(self, output): + if not output: + return + + super(ToreeKernel, self).Error(output) + + def handle_output(self, fd, fn): + stringIO = io.StringIO() + while True: + try: + b = read(fd.fileno(), 1024) + if b: + stringIO.write(b.decode('utf-8')) + except OSError: + break + + s = stringIO.getvalue() + if s: + fn(s.strip()) + + stringIO.close() + + def do_execute_direct(self, code, silent=False): + """ + :param code: + The code to be executed. + :param silent: + Whether to display output. + :return: + Return value, or None + + MetaKernel code handler. + """ + + if not code.strip(): + return None + + retval = None + try: + retval = self.gateway.entry_point.eval(code.rstrip()) + self.handle_output(self.gateway_proc.stdout, self.Print) + self.handle_output(self.gateway_proc.stderr, self.Error) + except Py4JError as e: + if not silent: + self.Error(format(e)) + + if retval is None: + return + elif isinstance(retval, str): + return TextOutput(retval) + else: + return retval + +if __name__ == '__main__': + ToreeKernel.run_as_main() + diff --git a/python/toree_kernel.py b/python/toree_kernel.py deleted file mode 100644 index 4b6266e..0000000 --- a/python/toree_kernel.py +++ /dev/null @@ -1,143 +0,0 @@ -# -# (C) Copyright IBM Corp. 2017 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import signal -import sys -import time -import io -import logging - -from os import O_NONBLOCK, read -from fcntl import fcntl, F_GETFL, F_SETFL -from subprocess import Popen, PIPE -from metakernel import MetaKernel -from py4j.java_gateway import JavaGateway, CallbackServerParameters, java_import -from py4j.protocol import Py4JError - -class TextOutput(object): - """Wrapper for text output whose repr is the text itself. - This avoids `repr(output)` adding quotation marks around already-rendered text. - """ - def __init__(self, output): - self.output = output - - def __repr__(self): - return self.output - -class ToreeKernel(MetaKernel): - implementation = 'toree_kernel' - implementation_version = '0.1' - langauge = 'scala' - language_version = '2.11' - banner = "toree_kernel" - language_info = {'name': 'scala', - 'mimetype': 'application/scala', - 'file_extension': '.scala'} - - - def __init__(self, **kwargs): - super(ToreeKernel, self).__init__(**kwargs) - self._start_toree_client() - - def sig_handler(signum, frame): - self.gateway_proc.terminate() - - def do_shutdown(self, restart): - super(ToreeKernel, self).do_shutdown(restart) - self.gateway_proc.terminate() - - def _start_toree_client(self): - args = [ - "java", - "-classpath", - "/opt/toree_proxy/lib/toree-gateway-0.1-jar-with-dependencies.jar", - "com.ibm.ToreeClient" - ] - - self.gateway_proc = Popen(args, stderr=PIPE, stdout=PIPE) - time.sleep(1.5) - self.gateway = JavaGateway( - start_callback_server=True, - callback_server_parameters=CallbackServerParameters()) - - flags = fcntl(self.gateway_proc.stdout, F_GETFL) # get current p.stdout flags - fcntl(self.gateway_proc.stdout, F_SETFL, flags | O_NONBLOCK) - - flags = fcntl(self.gateway_proc.stderr, F_GETFL) # get current p.stdout flags - fcntl(self.gateway_proc.stderr, F_SETFL, flags | O_NONBLOCK) - - signal.signal(signal.SIGTERM, self.sig_handler) - signal.signal(signal.SIGINT, self.sig_handler) - signal.signal(signal.SIGHUP, self.sig_handler) - - - def Error(self, output): - if not output: - return - - super(ToreeKernel, self).Error(output) - - def handle_output(self, fd, fn): - stringIO = io.StringIO() - while True: - try: - b = read(fd.fileno(), 1024) - if b: - stringIO.write(b.decode('utf-8')) - except OSError: - break - - s = stringIO.getvalue() - if s: - fn(s.strip()) - - stringIO.close() - - def do_execute_direct(self, code, silent=False): - """ - :param code: - The code to be executed. - :param silent: - Whether to display output. - :return: - Return value, or None - - MetaKernel code handler. - """ - - if not code.strip(): - return None - - retval = None - try: - retval = self.gateway.entry_point.eval(code.rstrip()) - self.handle_output(self.gateway_proc.stdout, self.Print) - self.handle_output(self.gateway_proc.stderr, self.Error) - except Py4JError as e: - if not silent: - self.Error(format(e)) - - if retval is None: - return - elif isinstance(retval, str): - return TextOutput(retval) - else: - return retval - -if __name__ == '__main__': - ToreeKernel.run_as_main() - diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala deleted file mode 100644 index d1a8904..0000000 --- a/src/main/scala/com/ibm/ToreeClient.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * (C) Copyright IBM Corp. 2017 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.ibm - - -import com.typesafe.config.ConfigFactory -import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap -import com.typesafe.config.Config -import org.apache.toree.kernel.protocol.v5.MIMEType -import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient -import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} -import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution -import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult, StreamContent} -import py4j.GatewayServer - -import scala.concurrent.{Await, Promise} -import scala.concurrent.duration.Duration -import org.slf4j.{Logger, LoggerFactory} - -import scala.util.Try - -class ToreeGateway(client: SparkKernelClient) { - final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) - - private def handleResult(promise:Promise[String], result: ExecuteResult) = { - log.warn(s"Result was: ${result.data(MIMEType.PlainText)}") - // promise.success(result.data(MIMEType.PlainText)) - promise.success(result.content) - - } - - private def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = { - log.warn(s"Successful code completion") - if(! promise.isCompleted) { - promise.complete(Try("done")) - } - } - - private def handleError(promise:Promise[String], reply:ExecuteReplyError) { - log.warn(s"Error was: ${reply.ename.get}") - //promise.failure(new Throwable("Error evaluating paragraph: " + reply.content)) - promise.success(reply.status + ":" + reply.ename.getOrElse("") + " - " + reply.evalue.getOrElse("")) - } - - private def handleStream(promise:Promise[String], content: StreamContent) { - log.warn(s"Received streaming content ${content.name} was: ${content.text}") - promise.success(content.text) - } - - def eval(code: String): Object = { - val promise = Promise[String] - try { - val exRes: DeferredExecution = client.execute(code) - .onResult(executeResult => { - handleResult(promise, executeResult) - }).onError(executeReplyError =>{ - handleError(promise, executeReplyError) - }).onSuccess(executeReplyOk => { - handleSuccess(promise, executeReplyOk) - }).onStream(streamResult => { - handleStream(promise, streamResult) - }) - - } catch { - case t : Throwable => { - log.info("Error proxying request: " + t.getMessage, t) - promise.success("Error proxying request: " + t.getMessage) - } - } - - Await.result(promise.future, Duration.Inf) - } -} - -object ToreeClient extends App { - - final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) - - def getConfigurationFilePath: String = { - var filePath = "/opt/toree_proxy/conf/profile.json" - - if (args.length == 0) { - for (arg <- args) { - if (arg.contains("json")) { - filePath = arg - } - } - } - - filePath - } - - log.info("Application Initialized from " + new java.io.File(".").getCanonicalPath) - log.info("With the following parameters:" ) - if (args.length == 0 ) { - log.info(">>> NONE" ) - } else { - for (arg <- args) { - log.info(">>> Arg :" + arg ) - } - } - - // Parse our configuration and create a client connecting to our kernel - val configFileContent = scala.io.Source.fromFile(getConfigurationFilePath).mkString - log.info(">>> Configuration in use " + configFileContent) - val config: Config = ConfigFactory.parseString(configFileContent) - - val client = (new ClientBootstrap(config) - with StandardSystemInitialization - with StandardHandlerInitialization).createClient() - - val toreeGateway = new ToreeGateway(client) - - val gatewayServer: GatewayServer = new GatewayServer(toreeGateway) - gatewayServer.start() -} diff --git a/src/main/scala/com/ibm/ToreeGateway.scala b/src/main/scala/com/ibm/ToreeGateway.scala new file mode 100644 index 0000000..edaa1e2 --- /dev/null +++ b/src/main/scala/com/ibm/ToreeGateway.scala @@ -0,0 +1,131 @@ +/* + * (C) Copyright IBM Corp. 2017 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.ibm + + +import com.typesafe.config.ConfigFactory +import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap +import com.typesafe.config.Config +import org.apache.toree.kernel.protocol.v5.MIMEType +import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient +import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} +import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution +import org.apache.toree.kernel.protocol.v5.content.{ExecuteReplyError, ExecuteReplyOk, ExecuteResult, StreamContent} +import py4j.GatewayServer + +import scala.concurrent.{Await, Promise} +import scala.concurrent.duration.Duration +import org.slf4j.{Logger, LoggerFactory} + +import scala.util.Try + +class ToreeGateway(client: SparkKernelClient) { + final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) + + private def handleResult(promise:Promise[String], result: ExecuteResult) = { + log.warn(s"Result was: ${result.data(MIMEType.PlainText)}") + // promise.success(result.data(MIMEType.PlainText)) + promise.success(result.content) + + } + + private def handleSuccess(promise:Promise[String], executeReplyOk: ExecuteReplyOk) = { + log.warn(s"Successful code completion") + if(! promise.isCompleted) { + promise.complete(Try("done")) + } + } + + private def handleError(promise:Promise[String], reply:ExecuteReplyError) { + log.warn(s"Error was: ${reply.ename.get}") + //promise.failure(new Throwable("Error evaluating paragraph: " + reply.content)) + promise.success(reply.status + ":" + reply.ename.getOrElse("") + " - " + reply.evalue.getOrElse("")) + } + + private def handleStream(promise:Promise[String], content: StreamContent) { + log.warn(s"Received streaming content ${content.name} was: ${content.text}") + promise.success(content.text) + } + + def eval(code: String): Object = { + val promise = Promise[String] + try { + val exRes: DeferredExecution = client.execute(code) + .onResult(executeResult => { + handleResult(promise, executeResult) + }).onError(executeReplyError =>{ + handleError(promise, executeReplyError) + }).onSuccess(executeReplyOk => { + handleSuccess(promise, executeReplyOk) + }).onStream(streamResult => { + handleStream(promise, streamResult) + }) + + } catch { + case t : Throwable => { + log.info("Error proxying request: " + t.getMessage, t) + promise.success("Error proxying request: " + t.getMessage) + } + } + + Await.result(promise.future, Duration.Inf) + } +} + +object ToreeGatewayClient extends App { + + final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) + + def getConfigurationFilePath: String = { + var filePath = "/opt/toree_proxy/conf/profile.json" + + if (args.length == 0) { + for (arg <- args) { + if (arg.contains("json")) { + filePath = arg + } + } + } + + filePath + } + + log.info("Application Initialized from " + new java.io.File(".").getCanonicalPath) + log.info("With the following parameters:" ) + if (args.length == 0 ) { + log.info(">>> NONE" ) + } else { + for (arg <- args) { + log.info(">>> Arg :" + arg ) + } + } + + // Parse our configuration and create a client connecting to our kernel + val configFileContent = scala.io.Source.fromFile(getConfigurationFilePath).mkString + log.info(">>> Configuration in use " + configFileContent) + val config: Config = ConfigFactory.parseString(configFileContent) + + val client = (new ClientBootstrap(config) + with StandardSystemInitialization + with StandardHandlerInitialization).createClient() + + val toreeGateway = new ToreeGateway(client) + + val gatewayServer: GatewayServer = new GatewayServer(toreeGateway) + gatewayServer.start() +} diff --git a/src/test/scala/com/ibm/ToreeClientSpec.scala b/src/test/scala/com/ibm/ToreeClientSpec.scala deleted file mode 100644 index 859f2f7..0000000 --- a/src/test/scala/com/ibm/ToreeClientSpec.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * (C) Copyright IBM Corp. 2017 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.ibm - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap -import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} -import org.scalatest.{FlatSpec, Ignore} -import org.slf4j.LoggerFactory - -@Ignore -class ToreeClientSpec extends FlatSpec { - - final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) - - - val profileJSON: String = """ - { - "stdin_port": 48701, - "control_port": 48702, - "hb_port": 48705, - "shell_port": 48703, - "iopub_port": 48704, - "ip": "9.125.72.72", - "transport": "tcp", - "signature_scheme": "hmac-sha256", - "key": "" - } - """.stripMargin - - val toreeGateway = { - // Parse our configuration and create a client connecting to our kernel - val config: Config = ConfigFactory.parseString(profileJSON) - - val client = (new ClientBootstrap(config) - with StandardSystemInitialization - with StandardHandlerInitialization).createClient() - - val toreeGateway = new ToreeGateway(client) - - toreeGateway - - } - - "gateway" should "receive dataframe show results" in { - val result = toreeGateway.eval( - """ - import org.apache.commons.io.IOUtils - import java.net.URL - import java.nio.charset.Charset - - val sqc = spark.sqlContext - import sqc.implicits._ - - val bankText = sc.parallelize( - IOUtils.toString( - new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"), - Charset.forName("utf8")).split("\n")) - - case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer) - - val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map( - s => Bank(s(0).toInt, - s(1).replaceAll("\"", ""), - s(2).replaceAll("\"", ""), - s(3).replaceAll("\"", ""), - s(5).replaceAll("\"", "").toInt - ) - ).toDF() - - bank.show(1) - """.stripMargin - ).toString.stripMargin - - assert(result.contains("only showing top 1 row")) - } - - "gateway" should "receive error messages when exception is thrown" in { - val result = toreeGateway.eval( - """ - println(1/0) - """.stripMargin - ).toString.stripMargin - - assert(result.contains("java.lang.ArithmeticException")) - } - - -} diff --git a/src/test/scala/com/ibm/ToreeGatewaySpec.scala b/src/test/scala/com/ibm/ToreeGatewaySpec.scala new file mode 100644 index 0000000..4ea0ada --- /dev/null +++ b/src/test/scala/com/ibm/ToreeGatewaySpec.scala @@ -0,0 +1,104 @@ +/* + * (C) Copyright IBM Corp. 2017 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.ibm + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap +import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} +import org.scalatest.{FlatSpec, Ignore} +import org.slf4j.LoggerFactory + +@Ignore +class ToreeGatewaySpec extends FlatSpec { + + final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) + + + val profileJSON: String = """ + { + "stdin_port": 48701, + "control_port": 48702, + "hb_port": 48705, + "shell_port": 48703, + "iopub_port": 48704, + "ip": "9.125.72.72", + "transport": "tcp", + "signature_scheme": "hmac-sha256", + "key": "" + } + """.stripMargin + + val toreeGateway = { + // Parse our configuration and create a client connecting to our kernel + val config: Config = ConfigFactory.parseString(profileJSON) + + val client = (new ClientBootstrap(config) + with StandardSystemInitialization + with StandardHandlerInitialization).createClient() + + val toreeGateway = new ToreeGateway(client) + + toreeGateway + + } + + "gateway" should "receive dataframe show results" in { + val result = toreeGateway.eval( + """ + import org.apache.commons.io.IOUtils + import java.net.URL + import java.nio.charset.Charset + + val sqc = spark.sqlContext + import sqc.implicits._ + + val bankText = sc.parallelize( + IOUtils.toString( + new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"), + Charset.forName("utf8")).split("\n")) + + case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer) + + val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map( + s => Bank(s(0).toInt, + s(1).replaceAll("\"", ""), + s(2).replaceAll("\"", ""), + s(3).replaceAll("\"", ""), + s(5).replaceAll("\"", "").toInt + ) + ).toDF() + + bank.show(1) + """.stripMargin + ).toString.stripMargin + + assert(result.contains("only showing top 1 row")) + } + + "gateway" should "receive error messages when exception is thrown" in { + val result = toreeGateway.eval( + """ + println(1/0) + """.stripMargin + ).toString.stripMargin + + assert(result.contains("java.lang.ArithmeticException")) + } + + +} -- cgit v1.2.3