diff options
author | Michael Armbrust <michael@databricks.com> | 2014-07-25 15:36:57 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-25 15:36:57 -0700 |
commit | afd757a241f41d7f8c458ef8f1f9ce8ed12986e5 (patch) | |
tree | 136e49b3392763acd5f952b70a50137cb96e9c75 /sql/hive-thriftserver/src/test | |
parent | 37ad3b724590dcf42bcdbfaf91b7a11914501945 (diff) | |
download | spark-afd757a241f41d7f8c458ef8f1f9ce8ed12986e5.tar.gz spark-afd757a241f41d7f8c458ef8f1f9ce8ed12986e5.tar.bz2 spark-afd757a241f41d7f8c458ef8f1f9ce8ed12986e5.zip |
Revert "[SPARK-2410][SQL] Merging Hive Thrift/JDBC server"
This reverts commit 06dc0d2c6b69c5d59b4d194ced2ac85bfe2e05e2.
#1399 is making Jenkins fail. We should investigate and put this back after its passing tests.
Author: Michael Armbrust <michael@databricks.com>
Closes #1594 from marmbrus/revertJDBC and squashes the following commits:
59748da [Michael Armbrust] Revert "[SPARK-2410][SQL] Merging Hive Thrift/JDBC server"
Diffstat (limited to 'sql/hive-thriftserver/src/test')
4 files changed, 0 insertions, 297 deletions
diff --git a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt b/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt deleted file mode 100644 index 850f8014b6..0000000000 --- a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt +++ /dev/null @@ -1,5 +0,0 @@ -238val_238 -86val_86 -311val_311 -27val_27 -165val_165 diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala deleted file mode 100644 index b90670a796..0000000000 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.io.{BufferedReader, InputStreamReader, PrintWriter} - -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.sql.hive.test.TestHive - -class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils { - val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli") - val METASTORE_PATH = TestUtils.getMetastorePath("cli") - - override def beforeAll() { - val pb = new ProcessBuilder( - "../../bin/spark-sql", - "--master", - "local", - "--hiveconf", - s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true", - "--hiveconf", - "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH) - - process = pb.start() - outputWriter = new PrintWriter(process.getOutputStream, true) - inputReader = new BufferedReader(new InputStreamReader(process.getInputStream)) - errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream)) - waitForOutput(inputReader, "spark-sql>") - } - - override def afterAll() { - process.destroy() - process.waitFor() - } - - test("simple commands") { - val dataFilePath = getDataFile("data/files/small_kv.txt") - executeQuery("create table hive_test1(key int, val string);") - executeQuery("load data local inpath '" + dataFilePath+ "' overwrite into table hive_test1;") - executeQuery("cache table hive_test1", "Time taken") - } -} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala deleted file mode 100644 index 59f4952b78..0000000000 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import scala.collection.JavaConversions._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent._ - -import java.io.{BufferedReader, InputStreamReader} -import java.sql.{Connection, DriverManager, Statement} - -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.sql.Logging -import org.apache.spark.sql.catalyst.util.getTempFilePath - -/** - * Test for the HiveThriftServer2 using JDBC. - */ -class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUtils with Logging { - - val WAREHOUSE_PATH = getTempFilePath("warehouse") - val METASTORE_PATH = getTempFilePath("metastore") - - val DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver" - val TABLE = "test" - // use a different port, than the hive standard 10000, - // for tests to avoid issues with the port being taken on some machines - val PORT = "10000" - - // If verbose is true, the test program will print all outputs coming from the Hive Thrift server. - val VERBOSE = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).getOrElse("false").toBoolean - - Class.forName(DRIVER_NAME) - - override def beforeAll() { launchServer() } - - override def afterAll() { stopServer() } - - private def launchServer(args: Seq[String] = Seq.empty) { - // Forking a new process to start the Hive Thrift server. The reason to do this is it is - // hard to clean up Hive resources entirely, so we just start a new process and kill - // that process for cleanup. - val defaultArgs = Seq( - "../../sbin/start-thriftserver.sh", - "--master local", - "--hiveconf", - "hive.root.logger=INFO,console", - "--hiveconf", - s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true", - "--hiveconf", - s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH") - val pb = new ProcessBuilder(defaultArgs ++ args) - process = pb.start() - inputReader = new BufferedReader(new InputStreamReader(process.getInputStream)) - errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream)) - waitForOutput(inputReader, "ThriftBinaryCLIService listening on") - - // Spawn a thread to read the output from the forked process. - // Note that this is necessary since in some configurations, log4j could be blocked - // if its output to stderr are not read, and eventually blocking the entire test suite. - future { - while (true) { - val stdout = readFrom(inputReader) - val stderr = readFrom(errorReader) - if (VERBOSE && stdout.length > 0) { - println(stdout) - } - if (VERBOSE && stderr.length > 0) { - println(stderr) - } - Thread.sleep(50) - } - } - } - - private def stopServer() { - process.destroy() - process.waitFor() - } - - test("test query execution against a Hive Thrift server") { - Thread.sleep(5 * 1000) - val dataFilePath = getDataFile("data/files/small_kv.txt") - val stmt = createStatement() - stmt.execute("DROP TABLE IF EXISTS test") - stmt.execute("DROP TABLE IF EXISTS test_cached") - stmt.execute("CREATE TABLE test(key int, val string)") - stmt.execute(s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test") - stmt.execute("CREATE TABLE test_cached as select * from test limit 4") - stmt.execute("CACHE TABLE test_cached") - - var rs = stmt.executeQuery("select count(*) from test") - rs.next() - assert(rs.getInt(1) === 5) - - rs = stmt.executeQuery("select count(*) from test_cached") - rs.next() - assert(rs.getInt(1) === 4) - - stmt.close() - } - - def getConnection: Connection = { - val connectURI = s"jdbc:hive2://localhost:$PORT/" - DriverManager.getConnection(connectURI, System.getProperty("user.name"), "") - } - - def createStatement(): Statement = getConnection.createStatement() -} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala deleted file mode 100644 index bb2242618f..0000000000 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.io.{BufferedReader, PrintWriter} -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.hive.common.LogUtils -import org.apache.hadoop.hive.common.LogUtils.LogInitializationException - -object TestUtils { - val timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss") - - def getWarehousePath(prefix: String): String = { - System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-warehouse-" + - timestamp.format(new Date) - } - - def getMetastorePath(prefix: String): String = { - System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-metastore-" + - timestamp.format(new Date) - } - - // Dummy function for initialize the log4j properties. - def init() { } - - // initialize log4j - try { - LogUtils.initHiveLog4j() - } catch { - case e: LogInitializationException => // Ignore the error. - } -} - -trait TestUtils { - var process : Process = null - var outputWriter : PrintWriter = null - var inputReader : BufferedReader = null - var errorReader : BufferedReader = null - - def executeQuery( - cmd: String, outputMessage: String = "OK", timeout: Long = 15000): String = { - println("Executing: " + cmd + ", expecting output: " + outputMessage) - outputWriter.write(cmd + "\n") - outputWriter.flush() - waitForQuery(timeout, outputMessage) - } - - protected def waitForQuery(timeout: Long, message: String): String = { - if (waitForOutput(errorReader, message, timeout)) { - Thread.sleep(500) - readOutput() - } else { - assert(false, "Didn't find \"" + message + "\" in the output:\n" + readOutput()) - null - } - } - - // Wait for the specified str to appear in the output. - protected def waitForOutput( - reader: BufferedReader, str: String, timeout: Long = 10000): Boolean = { - val startTime = System.currentTimeMillis - var out = "" - while (!out.contains(str) && System.currentTimeMillis < (startTime + timeout)) { - out += readFrom(reader) - } - out.contains(str) - } - - // Read stdout output and filter out garbage collection messages. - protected def readOutput(): String = { - val output = readFrom(inputReader) - // Remove GC Messages - val filteredOutput = output.lines.filterNot(x => x.contains("[GC") || x.contains("[Full GC")) - .mkString("\n") - filteredOutput - } - - protected def readFrom(reader: BufferedReader): String = { - var out = "" - var c = 0 - while (reader.ready) { - c = reader.read() - out += c.asInstanceOf[Char] - } - out - } - - protected def getDataFile(name: String) = { - Thread.currentThread().getContextClassLoader.getResource(name) - } -} |