aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBrian Burns <brian.p.burns@gmail.com>2017-01-08 14:40:49 -0500
committerBrian Burns <brian.p.burns@gmail.com>2017-01-08 14:40:49 -0500
commit22099a4b1c7c5f1f225b5cc1b257855d558d1905 (patch)
treec12bc01982721fddddbb6935efa5cc4d3162dfc8 /src
downloadtoree-gateway-22099a4b1c7c5f1f225b5cc1b257855d558d1905.tar.gz
toree-gateway-22099a4b1c7c5f1f225b5cc1b257855d558d1905.tar.bz2
toree-gateway-22099a4b1c7c5f1f225b5cc1b257855d558d1905.zip
initial commit
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/com/ibm/ToreeClient.scala60
1 files changed, 60 insertions, 0 deletions
diff --git a/src/main/scala/com/ibm/ToreeClient.scala b/src/main/scala/com/ibm/ToreeClient.scala
new file mode 100644
index 0000000..a075f77
--- /dev/null
+++ b/src/main/scala/com/ibm/ToreeClient.scala
@@ -0,0 +1,60 @@
+package com.ibm
+
+import com.typesafe.config.ConfigFactory
+import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap
+import com.typesafe.config.Config
+import org.apache.toree.kernel.protocol.v5.MIMEType
+import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient
+import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
+import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution
+import org.apache.toree.kernel.protocol.v5.content.ExecuteResult
+import py4j.GatewayServer
+
+import scala.concurrent.{Await, Promise}
+import scala.concurrent.duration.Duration
+
+class ToreeGateway(client: SparkKernelClient) {
+ // Define our callback
+ def printResult(result: ExecuteResult) = {
+ println(s"Result was: ${result.data.get(MIMEType.PlainText).get}")
+ }
+
+ def eval(code: String): Object = {
+ val promise = Promise[String]
+ val exRes: DeferredExecution = client.execute(code).onResult(er => {
+ promise.success(er.data.get(MIMEType.PlainText).get)
+ })
+
+ Await.result(promise.future, Duration.Inf)
+ }
+}
+
+object ToreeClient extends App {
+
+ val profileJSON: String = """
+{
+ "stdin_port": 48691,
+ "control_port": 44808,
+ "hb_port": 49691,
+ "shell_port": 40544,
+ "iopub_port": 43462,
+ "ip": "127.0.0.1",
+ "transport": "tcp",
+ "signature_scheme": "hmac-sha256",
+ "key": ""
+}
+""".stripMargin
+
+ // Parse our configuration and create a client connecting to our kernel
+ val config: Config = ConfigFactory.parseString(profileJSON)
+
+ val client = (new ClientBootstrap(config)
+ with StandardSystemInitialization
+ with StandardHandlerInitialization).createClient()
+
+ val toreeGateway = new ToreeGateway(client)
+ print(toreeGateway.eval("sc"))
+
+ val gatewayServer: GatewayServer = new GatewayServer(toreeGateway)
+ gatewayServer.start()
+}