aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala41
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala2
3 files changed, 53 insertions, 8 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index b070fa8eaa..cc07db827d 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -25,11 +25,15 @@ import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.Logging
import org.apache.spark.util.Utils
+/**
+ * A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary
+ * Hive metastore and warehouse.
+ */
class CliSuite extends FunSuite with BeforeAndAfter with Logging {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
@@ -58,13 +62,13 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
- | --driver-class-path ${sys.props("java.class.path")}
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}
var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
- val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
+ // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI.
+ val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object
@@ -124,7 +128,7 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test;"
- -> "Time taken: "
+ -> "OK"
)
}
@@ -151,4 +155,33 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
-> "hive_test"
)
}
+
+ test("Commands using SerDe provided in --jars") {
+ val jarFile =
+ "../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
+ .split("/")
+ .mkString(File.separator)
+
+ val dataFilePath =
+ Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
+
+ runCliWithin(1.minute, Seq("--jars", s"$jarFile"))(
+ """CREATE TABLE t1(key string, val string)
+ |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
+ """.stripMargin
+ -> "OK",
+ "CREATE TABLE sourceTable (key INT, val STRING);"
+ -> "OK",
+ s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
+ -> "OK",
+ "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
+ -> "Time taken:",
+ "SELECT count(key) FROM t1;"
+ -> "5",
+ "DROP TABLE t1;"
+ -> "OK",
+ "DROP TABLE sourceTable;"
+ -> "OK"
+ )
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index b64768abab..9ab98fdcce 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
+import java.net.{URL, URLClassLoader}
import java.sql.Timestamp
import java.util.{ArrayList => JArrayList}
@@ -25,7 +26,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.spark.sql.catalyst.ParserDialect
import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.language.implicitConversions
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -188,8 +189,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
"Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
s"or change $HIVE_METASTORE_VERSION to $hiveExecutionVersion.")
}
- val jars = getClass.getClassLoader match {
- case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs
+ // We recursively add all jars in the class loader chain,
+ // starting from the given urlClassLoader.
+ def addJars(urlClassLoader: URLClassLoader): Array[URL] = {
+ val jarsInParent = urlClassLoader.getParent match {
+ case parent: URLClassLoader => addJars(parent)
+ case other => Array.empty[URL]
+ }
+
+ urlClassLoader.getURLs ++ jarsInParent
+ }
+
+ val jars = Utils.getContextOrSparkClassLoader match {
+ case urlClassLoader: URLClassLoader => addJars(urlClassLoader)
case other =>
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore " +
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 0b6f7a334a..294fc3bd7d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -79,7 +79,7 @@ class HadoopTableReader(
makeRDDForTable(
hiveTable,
Class.forName(
- relation.tableDesc.getSerdeClassName, true, Utils.getSparkClassLoader)
+ relation.tableDesc.getSerdeClassName, true, Utils.getContextOrSparkClassLoader)
.asInstanceOf[Class[Deserializer]],
filterOpt = None)