aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver/src/test')
-rw-r--r--sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt5
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala57
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala135
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala108
4 files changed, 305 insertions, 0 deletions
diff --git a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt b/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
new file mode 100644
index 0000000000..850f8014b6
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
@@ -0,0 +1,5 @@
+238val_238
+86val_86
+311val_311
+27val_27
+165val_165
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
new file mode 100644
index 0000000000..69f19f826a
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.io.{BufferedReader, InputStreamReader, PrintWriter}
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
+ val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli")
+ val METASTORE_PATH = TestUtils.getMetastorePath("cli")
+
+ override def beforeAll() {
+ val pb = new ProcessBuilder(
+ "../../bin/spark-sql",
+ "--master",
+ "local",
+ "--hiveconf",
+ s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
+ "--hiveconf",
+ "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH)
+
+ process = pb.start()
+ outputWriter = new PrintWriter(process.getOutputStream, true)
+ inputReader = new BufferedReader(new InputStreamReader(process.getInputStream))
+ errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
+ waitForOutput(inputReader, "spark-sql>")
+ }
+
+ override def afterAll() {
+ process.destroy()
+ process.waitFor()
+ }
+
+ test("simple commands") {
+ val dataFilePath = getDataFile("data/files/small_kv.txt")
+ executeQuery("create table hive_test1(key int, val string);")
+ executeQuery("load data local inpath '" + dataFilePath+ "' overwrite into table hive_test1;")
+ executeQuery("cache table hive_test1", "Time taken")
+ }
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
new file mode 100644
index 0000000000..fe3403b329
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent._
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net.ServerSocket
+import java.sql.{Connection, DriverManager, Statement}
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.Logging
+import org.apache.spark.sql.catalyst.util.getTempFilePath
+
+/**
+ * Test for the HiveThriftServer2 using JDBC.
+ */
+class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUtils with Logging {
+
+ val WAREHOUSE_PATH = getTempFilePath("warehouse")
+ val METASTORE_PATH = getTempFilePath("metastore")
+
+ val DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"
+ val TABLE = "test"
+ val HOST = "localhost"
+ val PORT = {
+ // Let the system to choose a random available port to avoid collision with other parallel
+ // builds.
+ val socket = new ServerSocket(0)
+ val port = socket.getLocalPort
+ socket.close()
+ port
+ }
+
+ // If verbose is true, the test program will print all outputs coming from the Hive Thrift server.
+ val VERBOSE = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).getOrElse("false").toBoolean
+
+ Class.forName(DRIVER_NAME)
+
+ override def beforeAll() { launchServer() }
+
+ override def afterAll() { stopServer() }
+
+ private def launchServer(args: Seq[String] = Seq.empty) {
+ // Forking a new process to start the Hive Thrift server. The reason to do this is it is
+ // hard to clean up Hive resources entirely, so we just start a new process and kill
+ // that process for cleanup.
+ val defaultArgs = Seq(
+ "../../sbin/start-thriftserver.sh",
+ "--master local",
+ "--hiveconf",
+ "hive.root.logger=INFO,console",
+ "--hiveconf",
+ s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
+ "--hiveconf",
+ s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH")
+ val pb = new ProcessBuilder(defaultArgs ++ args)
+ val environment = pb.environment()
+ environment.put("HIVE_SERVER2_THRIFT_PORT", PORT.toString)
+ environment.put("HIVE_SERVER2_THRIFT_BIND_HOST", HOST)
+ process = pb.start()
+ inputReader = new BufferedReader(new InputStreamReader(process.getInputStream))
+ errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
+ waitForOutput(inputReader, "ThriftBinaryCLIService listening on")
+
+ // Spawn a thread to read the output from the forked process.
+ // Note that this is necessary since in some configurations, log4j could be blocked
+ // if its output to stderr are not read, and eventually blocking the entire test suite.
+ future {
+ while (true) {
+ val stdout = readFrom(inputReader)
+ val stderr = readFrom(errorReader)
+ if (VERBOSE && stdout.length > 0) {
+ println(stdout)
+ }
+ if (VERBOSE && stderr.length > 0) {
+ println(stderr)
+ }
+ Thread.sleep(50)
+ }
+ }
+ }
+
+ private def stopServer() {
+ process.destroy()
+ process.waitFor()
+ }
+
+ test("test query execution against a Hive Thrift server") {
+ Thread.sleep(5 * 1000)
+ val dataFilePath = getDataFile("data/files/small_kv.txt")
+ val stmt = createStatement()
+ stmt.execute("DROP TABLE IF EXISTS test")
+ stmt.execute("DROP TABLE IF EXISTS test_cached")
+ stmt.execute("CREATE TABLE test(key int, val string)")
+ stmt.execute(s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test")
+ stmt.execute("CREATE TABLE test_cached as select * from test limit 4")
+ stmt.execute("CACHE TABLE test_cached")
+
+ var rs = stmt.executeQuery("select count(*) from test")
+ rs.next()
+ assert(rs.getInt(1) === 5)
+
+ rs = stmt.executeQuery("select count(*) from test_cached")
+ rs.next()
+ assert(rs.getInt(1) === 4)
+
+ stmt.close()
+ }
+
+ def getConnection: Connection = {
+ val connectURI = s"jdbc:hive2://localhost:$PORT/"
+ DriverManager.getConnection(connectURI, System.getProperty("user.name"), "")
+ }
+
+ def createStatement(): Statement = getConnection.createStatement()
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
new file mode 100644
index 0000000000..bb2242618f
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.io.{BufferedReader, PrintWriter}
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.hadoop.hive.common.LogUtils
+import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
+
+object TestUtils {
+ val timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss")
+
+ def getWarehousePath(prefix: String): String = {
+ System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-warehouse-" +
+ timestamp.format(new Date)
+ }
+
+ def getMetastorePath(prefix: String): String = {
+ System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-metastore-" +
+ timestamp.format(new Date)
+ }
+
+ // Dummy function for initialize the log4j properties.
+ def init() { }
+
+ // initialize log4j
+ try {
+ LogUtils.initHiveLog4j()
+ } catch {
+ case e: LogInitializationException => // Ignore the error.
+ }
+}
+
+trait TestUtils {
+ var process : Process = null
+ var outputWriter : PrintWriter = null
+ var inputReader : BufferedReader = null
+ var errorReader : BufferedReader = null
+
+ def executeQuery(
+ cmd: String, outputMessage: String = "OK", timeout: Long = 15000): String = {
+ println("Executing: " + cmd + ", expecting output: " + outputMessage)
+ outputWriter.write(cmd + "\n")
+ outputWriter.flush()
+ waitForQuery(timeout, outputMessage)
+ }
+
+ protected def waitForQuery(timeout: Long, message: String): String = {
+ if (waitForOutput(errorReader, message, timeout)) {
+ Thread.sleep(500)
+ readOutput()
+ } else {
+ assert(false, "Didn't find \"" + message + "\" in the output:\n" + readOutput())
+ null
+ }
+ }
+
+ // Wait for the specified str to appear in the output.
+ protected def waitForOutput(
+ reader: BufferedReader, str: String, timeout: Long = 10000): Boolean = {
+ val startTime = System.currentTimeMillis
+ var out = ""
+ while (!out.contains(str) && System.currentTimeMillis < (startTime + timeout)) {
+ out += readFrom(reader)
+ }
+ out.contains(str)
+ }
+
+ // Read stdout output and filter out garbage collection messages.
+ protected def readOutput(): String = {
+ val output = readFrom(inputReader)
+ // Remove GC Messages
+ val filteredOutput = output.lines.filterNot(x => x.contains("[GC") || x.contains("[Full GC"))
+ .mkString("\n")
+ filteredOutput
+ }
+
+ protected def readFrom(reader: BufferedReader): String = {
+ var out = ""
+ var c = 0
+ while (reader.ready) {
+ c = reader.read()
+ out += c.asInstanceOf[Char]
+ }
+ out
+ }
+
+ protected def getDataFile(name: String) = {
+ Thread.currentThread().getContextClassLoader.getResource(name)
+ }
+}