1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
/*
* (C) Copyright IBM Corp. 2017
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.ibm
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.toree.kernel.protocol.v5.client.boot.ClientBootstrap
import org.apache.toree.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
import org.scalatest.{FlatSpec, Ignore}
import org.slf4j.LoggerFactory
@Ignore
class ToreeGatewaySpec extends FlatSpec {
final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
val profileJSON: String = """
{
"stdin_port": 48701,
"control_port": 48702,
"hb_port": 48705,
"shell_port": 48703,
"iopub_port": 48704,
"ip": "9.125.72.72",
"transport": "tcp",
"signature_scheme": "hmac-sha256",
"key": ""
}
""".stripMargin
val toreeGateway = {
// Parse our configuration and create a client connecting to our kernel
val config: Config = ConfigFactory.parseString(profileJSON)
val client = (new ClientBootstrap(config)
with StandardSystemInitialization
with StandardHandlerInitialization).createClient()
val toreeGateway = new ToreeGateway(client)
toreeGateway
}
"gateway" should "receive dataframe show results" in {
val result = toreeGateway.eval(
"""
import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset
val sqc = spark.sqlContext
import sqc.implicits._
val bankText = sc.parallelize(
IOUtils.toString(
new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
Charset.forName("utf8")).split("\n"))
case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)
val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
s => Bank(s(0).toInt,
s(1).replaceAll("\"", ""),
s(2).replaceAll("\"", ""),
s(3).replaceAll("\"", ""),
s(5).replaceAll("\"", "").toInt
)
).toDF()
bank.show(1)
""".stripMargin
).toString.stripMargin
assert(result.contains("only showing top 1 row"))
}
"gateway" should "receive error messages when exception is thrown" in {
val result = toreeGateway.eval(
"""
println(1/0)
""".stripMargin
).toString.stripMargin
assert(result.contains("java.lang.ArithmeticException"))
}
}
|