aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-31 11:34:51 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-31 11:34:51 -0700
commit23468e7e96bf047ba53806352558b9d661567b23 (patch)
tree46900f9a751c74050772fb0860c8a9b51afb9294 /sql
parentea465af12ddae424af9b4e742c3d5aed2a0bc8ec (diff)
downloadspark-23468e7e96bf047ba53806352558b9d661567b23.tar.gz
spark-23468e7e96bf047ba53806352558b9d661567b23.tar.bz2
spark-23468e7e96bf047ba53806352558b9d661567b23.zip
[SPARK-2220][SQL] Fixes remaining Hive commands
This PR adds support for the `ADD FILE` Hive command, and removes `ShellCommand` and `SourceCommand`. The reason is described in [this SPARK-2220 comment](https://issues.apache.org/jira/browse/SPARK-2220?focusedCommentId=14191841&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14191841). Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #3038 from liancheng/hive-commands and squashes the following commits: 6db61e0 [Cheng Lian] Fixes remaining Hive commands
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala19
5 files changed, 45 insertions, 28 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
index 12e8346a64..f5c19ee69c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
@@ -137,7 +137,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET")
protected val TABLE = Keyword("TABLE")
- protected val SOURCE = Keyword("SOURCE")
protected val UNCACHE = Keyword("UNCACHE")
protected implicit def asParser(k: Keyword): Parser[String] =
@@ -152,8 +151,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
override val lexical = new SqlLexical(reservedWords)
- override protected lazy val start: Parser[LogicalPlan] =
- cache | uncache | set | shell | source | others
+ override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
private lazy val cache: Parser[LogicalPlan] =
CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
@@ -171,16 +169,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
case input => SetCommandParser(input)
}
- private lazy val shell: Parser[LogicalPlan] =
- "!" ~> restInput ^^ {
- case input => ShellCommand(input.trim)
- }
-
- private lazy val source: Parser[LogicalPlan] =
- SOURCE ~> restInput ^^ {
- case input => SourceCommand(input.trim)
- }
-
private lazy val others: Parser[LogicalPlan] =
wholeInput ^^ {
case input => fallback(input)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index b8ba2ee428..1d513d7789 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.types.StringType
/**
@@ -41,6 +41,15 @@ case class NativeCommand(cmd: String) extends Command {
/**
* Commands of the form "SET [key [= value] ]".
*/
+case class DFSCommand(kv: Option[(String, Option[String])]) extends Command {
+ override def output = Seq(
+ AttributeReference("DFS output", StringType, nullable = false)())
+}
+
+/**
+ *
+ * Commands of the form "SET [key [= value] ]".
+ */
case class SetCommand(kv: Option[(String, Option[String])]) extends Command {
override def output = Seq(
AttributeReference("", StringType, nullable = false)())
@@ -81,14 +90,3 @@ case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}
-
-/**
- * Returned for the "! shellCommand" command
- */
-case class ShellCommand(cmd: String) extends Command
-
-
-/**
- * Returned for the "SOURCE file" command
- */
-case class SourceCommand(filePath: String) extends Command
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index e59d4d536a..3207ad81d9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -206,6 +206,8 @@ private[hive] trait HiveStrategies {
case hive.AddJar(path) => execution.AddJar(path) :: Nil
+ case hive.AddFile(path) => execution.AddFile(path) :: Nil
+
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case describe: logical.DescribeCommand =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 0fc674af31..903075edf7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -76,3 +76,19 @@ case class AddJar(path: String) extends LeafNode with Command {
Seq.empty[Row]
}
}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class AddFile(path: String) extends LeafNode with Command {
+ def hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+ override def output = Seq.empty
+
+ override protected lazy val sideEffectResult: Seq[Row] = {
+ hiveContext.runSqlHive(s"ADD FILE $path")
+ hiveContext.sparkContext.addFile(path)
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index ffe1f0b90f..5918f888c8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.hive.execution
+import java.io.File
+
import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
@@ -569,7 +571,7 @@ class HiveQuerySuite extends HiveComparisonTest {
|WITH serdeproperties('s1'='9')
""".stripMargin)
}
- // Now only verify 0.12.0, and ignore other versions due to binary compatability
+ // Now only verify 0.12.0, and ignore other versions due to binary compatibility
// current TestSerDe.jar is from 0.12.0
if (HiveShim.version == "0.12.0") {
sql(s"ADD JAR $testJar")
@@ -581,6 +583,17 @@ class HiveQuerySuite extends HiveComparisonTest {
sql("DROP TABLE alter1")
}
+ test("ADD FILE command") {
+ val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile
+ sql(s"ADD FILE $testFile")
+
+ val checkAddFileRDD = sparkContext.parallelize(1 to 2, 1).mapPartitions { _ =>
+ Iterator.single(new File(SparkFiles.get("v1.txt")).canRead)
+ }
+
+ assert(checkAddFileRDD.first())
+ }
+
case class LogEntry(filename: String, message: String)
case class LogFile(name: String)
@@ -816,7 +829,7 @@ class HiveQuerySuite extends HiveComparisonTest {
createQueryTest("select from thrift based table",
"SELECT * from src_thrift")
-
+
// Put tests that depend on specific Hive settings before these last two test,
// since they modify /clear stuff.
}