diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-09-04 18:47:45 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-09-04 18:47:45 -0700 |
commit | ee575f12f2ab059d9c1b4fa8d6c1e62248c3d11b (patch) | |
tree | cccc06b17c8ac2ac3aff3e3cb15e3ff5b46181c1 | |
parent | 3eb6ef316c2a5ee43d5ecfcf9f10c2d7adc6b819 (diff) | |
download | spark-ee575f12f2ab059d9c1b4fa8d6c1e62248c3d11b.tar.gz spark-ee575f12f2ab059d9c1b4fa8d6c1e62248c3d11b.tar.bz2 spark-ee575f12f2ab059d9c1b4fa8d6c1e62248c3d11b.zip |
[SPARK-2219][SQL] Added support for the "add jar" command
Adds logical and physical command classes for the "add jar" command.
Note that this PR conflicts with and should be merged after #2215.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #2242 from liancheng/add-jar and squashes the following commits:
e43a2f1 [Cheng Lian] Updates AddJar according to conventions introduced in #2215
b99107f [Cheng Lian] Added test case for ADD JAR command
095b2c7 [Cheng Lian] Also forward ADD JAR command to Hive
9be031b [Cheng Lian] Trims Jar path string
8195056 [Cheng Lian] Added support for the "add jar" command
4 files changed, 46 insertions, 8 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index a4dd6be5f9..c98287c6aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command private[hive] case class AddFile(filePath: String) extends Command +private[hive] case class AddJar(path: String) extends Command + private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command private[hive] case class AnalyzeTable(tableName: String) extends Command @@ -231,7 +233,7 @@ private[hive] object HiveQl { } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) } else if (sql.trim.toLowerCase.startsWith("add jar")) { - NativeCommand(sql) + AddJar(sql.trim.drop(8).trim) } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.trim.drop(9)) } else if (sql.trim.toLowerCase.startsWith("dfs")) { @@ -1018,9 +1020,9 @@ private[hive] object HiveQl { /* Other functions */ case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand - case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) => + case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) - case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => + case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length)) /* UDFs - Must be last otherwise will preempt built in functions */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 24abb1b5bd..72cc01cdf4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -195,11 +195,12 @@ private[hive] trait HiveStrategies { case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.NativeCommand(sql) => - NativeCommand(sql, plan.output)(context) :: Nil + case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil + case hive.AddJar(path) => execution.AddJar(path) :: Nil + case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil case describe: logical.DescribeCommand => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index a1a4aa7de7..d61c5e274a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -60,3 +60,19 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Seq.empty[Row] } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class AddJar(path: String) extends LeafNode with Command { + def hiveContext = sqlContext.asInstanceOf[HiveContext] + + override def output = Seq.empty + + override protected[sql] lazy val sideEffectResult: Seq[Row] = { + hiveContext.runSqlHive(s"ADD JAR $path") + hiveContext.sparkContext.addJar(path) + Seq.empty[Row] + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c4abb3eb48..f4217a52c3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.hive.execution +import java.io.File + import scala.util.Try -import org.apache.spark.sql.{SchemaRDD, Row} +import org.apache.spark.SparkException import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -313,7 +315,7 @@ class HiveQuerySuite extends HiveComparisonTest { "SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15") test("case sensitivity: registered table") { - val testData: SchemaRDD = + val testData = TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(2, "str2") :: Nil) @@ -467,7 +469,7 @@ class HiveQuerySuite extends HiveComparisonTest { } // Describe a registered temporary table. - val testData: SchemaRDD = + val testData = TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(1, "str2") :: Nil) @@ -495,6 +497,23 @@ class HiveQuerySuite extends HiveComparisonTest { } } + test("ADD JAR command") { + val testJar = TestHive.getHiveFile("data/files/TestSerDe.jar").getCanonicalPath + sql("CREATE TABLE alter1(a INT, b INT)") + intercept[Exception] { + sql( + """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe' + |WITH serdeproperties('s1'='9') + """.stripMargin) + } + sql(s"ADD JAR $testJar") + sql( + """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe' + |WITH serdeproperties('s1'='9') + """.stripMargin) + sql("DROP TABLE alter1") + } + test("parse HQL set commands") { // Adapted from its SQL counterpart. val testKey = "spark.sql.key.usedfortestonly" |