aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/com/ibm/ToreeClient.scala
blob: b644b02bf3effeffbaa7cedaf7becc1b801e5053 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.ibm

import com.typesafe.config.ConfigFactory
import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap
import com.typesafe.config.Config
import org.apache.toree.kernel.protocol.v5.MIMEType
import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient
import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution
import org.apache.toree.kernel.protocol.v5.content.ExecuteResult
import py4j.GatewayServer

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration

class ToreeGateway(client: SparkKernelClient) {
  //  Define our callback
  def printResult(result: ExecuteResult) = {
    println(s"Result was: ${result.data.get(MIMEType.PlainText).get}")
  }

  def eval(code: String): Object = {
    val promise = Promise[String]
    val exRes: DeferredExecution = client.execute(code).onResult(er => {
      promise.success(er.data.get(MIMEType.PlainText).get)
    })

    Await.result(promise.future, Duration.Inf)
  }
}

object ToreeClient extends App {

  val profileJSON: String = """
{
    "stdin_port":   48691,
    "control_port": 44808,
    "hb_port":      49691,
    "shell_port":   40544,
    "iopub_port":   43462,
    "ip": "9.125.72.72",
    "transport": "tcp",
    "signature_scheme": "hmac-sha256",
    "key": ""
}
""".stripMargin

  // Parse our configuration and create a client connecting to our kernel
  val config: Config = ConfigFactory.parseString(profileJSON)

  val client = (new ClientBootstrap(config)
    with StandardSystemInitialization
    with StandardHandlerInitialization).createClient()

  val toreeGateway = new ToreeGateway(client)
  /*
  val code: String =
    """
      |sc.parallelize(List(1,2,3,4,5)).reduce((a, b) => a + b)
    """.stripMargin
  print(toreeGateway.eval(args(0)))
  */

  val gatewayServer: GatewayServer = new GatewayServer(toreeGateway)
  gatewayServer.start()
}