blob: c9dedcf0545499e71d0a06c5773217d76dc03f71 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
package com.ibm
import com.typesafe.config.ConfigFactory
import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap
import com.typesafe.config.Config
import org.apache.toree.kernel.protocol.v5.MIMEType
import org.apache.toree.kernel.protocol.v5.client.SparkKernelClient
import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
import org.apache.toree.kernel.protocol.v5.client.execution.DeferredExecution
import org.apache.toree.kernel.protocol.v5.content.ExecuteResult
import py4j.GatewayServer
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
import org.slf4j.LoggerFactory
class ToreeGateway(client: SparkKernelClient) {
// Define our callback
def printResult(result: ExecuteResult) = {
println(s"Result was: ${result.data.get(MIMEType.PlainText).get}")
}
def eval(code: String): Object = {
val promise = Promise[String]
val exRes: DeferredExecution = client.execute(code).onResult(er => {
promise.success(er.data.get(MIMEType.PlainText).get)
})
Await.result(promise.future, Duration.Inf)
}
}
object ToreeClient extends App {
final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
def getConfigurationFilePath: String = {
var filePath = "/opt/toree_proxy/conf/profile.json"
if (args.length == 0) {
for (arg <- args) {
if (arg.contains("json")) {
filePath = arg
}
}
}
filePath
}
log.info("Application Initialized from " + new java.io.File(".").getCanonicalPath)
log.info("With the following parameters:" )
if (args.length == 0 ) {
log.info(">>> NONE" )
} else {
for (arg <- args) {
log.info(">>> Arg :" + arg )
}
}
// Parse our configuration and create a client connecting to our kernel
val configFileContent = scala.io.Source.fromFile(getConfigurationFilePath).mkString
log.info(">>> Configuration in use " + configFileContent)
val config: Config = ConfigFactory.parseString(configFileContent)
val client = (new ClientBootstrap(config)
with StandardSystemInitialization
with StandardHandlerInitialization).createClient()
val toreeGateway = new ToreeGateway(client)
val gatewayServer: GatewayServer = new GatewayServer(toreeGateway)
gatewayServer.start()
}
|