aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorXin Wu <xinwu@us.ibm.com>2016-05-23 17:32:01 -0700
committerCheng Lian <lian@databricks.com>2016-05-23 17:32:01 -0700
commit01659bc50cd3d53815d205d005c3678e714c08e0 (patch)
treed439b5175acd61be26237905795fc3398b1a958d /sql
parenta8e97d17b91684e68290d9f18a43622232aa94e7 (diff)
downloadspark-01659bc50cd3d53815d205d005c3678e714c08e0.tar.gz
spark-01659bc50cd3d53815d205d005c3678e714c08e0.tar.bz2
spark-01659bc50cd3d53815d205d005c3678e714c08e0.zip
[SPARK-15431][SQL] Support LIST FILE(s)|JAR(s) command natively
## What changes were proposed in this pull request? Currently command `ADD FILE|JAR <filepath | jarpath>` is supported natively in SparkSQL. However, when this command is run, the file/jar is added to the resources that can not be looked up by `LIST FILE(s)|JAR(s)` command because the `LIST` command is passed to Hive command processor in Spark-SQL or simply not supported in Spark-shell. There is no way users can find out what files/jars are added to the spark context. Refer to [Hive commands](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli) This PR is to support following commands: `LIST (FILE[s] [filepath ...] | JAR[s] [jarfile ...])` ### For example: ##### LIST FILE(s) ``` scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt") res1: org.apache.spark.sql.DataFrame = [] scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt") res2: org.apache.spark.sql.DataFrame = [] scala> spark.sql("list file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt").show(false) +----------------------------------------------+ |result | +----------------------------------------------+ |hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt| +----------------------------------------------+ scala> spark.sql("list files").show(false) +----------------------------------------------+ |result | +----------------------------------------------+ |hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt| |hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt | +----------------------------------------------+ ``` ##### LIST JAR(s) ``` scala> spark.sql("add jar /Users/xinwu/spark/core/src/test/resources/TestUDTF.jar") res9: org.apache.spark.sql.DataFrame = [result: int] scala> spark.sql("list jar TestUDTF.jar").show(false) +---------------------------------------------+ |result | +---------------------------------------------+ |spark://192.168.1.234:50131/jars/TestUDTF.jar| +---------------------------------------------+ scala> spark.sql("list jars").show(false) +---------------------------------------------+ |result | +---------------------------------------------+ |spark://192.168.1.234:50131/jars/TestUDTF.jar| +---------------------------------------------+ ``` ## How was this patch tested? New test cases are added for Spark-SQL, Spark-Shell and SparkContext API code path. Author: Xin Wu <xinwu@us.ibm.com> Author: xin Wu <xinwu@us.ibm.com> Closes #13212 from xwu0226/list_command.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala57
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala10
-rw-r--r--sql/hive-thriftserver/src/test/resources/TestUDTF.jarbin0 -> 1328 bytes
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala19
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala12
7 files changed, 126 insertions, 16 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 8ea8f76629..403191af5e 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -117,7 +117,7 @@ statement
tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec?
(COLUMNS identifierList)? #truncateTable
- | ADD identifier .*? #addResource
+ | op=(ADD | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
| RESET #resetConfiguration
@@ -642,7 +642,7 @@ nonReserved
| SORT | CLUSTER | DISTRIBUTE | UNSET | TBLPROPERTIES | SKEWED | STORED | DIRECTORIES | LOCATION
| EXCHANGE | ARCHIVE | UNARCHIVE | FILEFORMAT | TOUCH | COMPACT | CONCATENATE | CHANGE
| CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
- | DBPROPERTIES | DFS | TRUNCATE | COMPUTE
+ | DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
| ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
@@ -843,6 +843,7 @@ DFS: 'DFS';
TRUNCATE: 'TRUNCATE';
ANALYZE: 'ANALYZE';
COMPUTE: 'COMPUTE';
+LIST: 'LIST';
STATISTICS: 'STATISTICS';
PARTITIONED: 'PARTITIONED';
EXTERNAL: 'EXTERNAL';
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 6e4af9500c..f85d6062e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -774,13 +774,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create an [[AddJarCommand]] or [[AddFileCommand]] command depending on the requested resource.
+ * Create a [[AddFileCommand]], [[AddJarCommand]], [[ListFilesCommand]] or [[ListJarsCommand]]
+ * command depending on the requested operation on resources.
+ * Expected format:
+ * {{{
+ * ADD (FILE[S] <filepath ...> | JAR[S] <jarpath ...>)
+ * LIST (FILE[S] [filepath ...] | JAR[S] [jarpath ...])
+ * }}}
*/
- override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) {
- ctx.identifier.getText.toLowerCase match {
- case "file" => AddFileCommand(remainder(ctx.identifier).trim)
- case "jar" => AddJarCommand(remainder(ctx.identifier).trim)
- case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx)
+ override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) {
+ val mayebePaths = remainder(ctx.identifier).trim
+ ctx.op.getType match {
+ case SqlBaseParser.ADD =>
+ ctx.identifier.getText.toLowerCase match {
+ case "file" => AddFileCommand(mayebePaths)
+ case "jar" => AddJarCommand(mayebePaths)
+ case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx)
+ }
+ case SqlBaseParser.LIST =>
+ ctx.identifier.getText.toLowerCase match {
+ case "files" | "file" =>
+ if (mayebePaths.length > 0) {
+ ListFilesCommand(mayebePaths.split("\\s+"))
+ } else {
+ ListFilesCommand()
+ }
+ case "jars" | "jar" =>
+ if (mayebePaths.length > 0) {
+ ListJarsCommand(mayebePaths.split("\\s+"))
+ } else {
+ ListJarsCommand()
+ }
+ case other => throw operationNotAllowed(s"LIST with resource type '$other'", ctx)
+ }
+ case _ => throw operationNotAllowed(s"Other types of operation on resources", ctx)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
index 162d493c1f..20b0894667 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -17,9 +17,14 @@
package org.apache.spark.sql.execution.command
+import java.io.File
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/**
* Adds a jar to the current session so it can be used (for UDFs or serdes).
@@ -46,3 +51,51 @@ case class AddFileCommand(path: String) extends RunnableCommand {
Seq.empty[Row]
}
}
+
+/**
+ * Returns a list of file paths that are added to resources.
+ * If file paths are provided, return the ones that are added to resources.
+ */
+case class ListFilesCommand(files: Seq[String] = Seq.empty[String]) extends RunnableCommand {
+ override val output: Seq[Attribute] = {
+ AttributeReference("Results", StringType, nullable = false)() :: Nil
+ }
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val fileList = sparkSession.sparkContext.listFiles()
+ if (files.size > 0) {
+ files.map { f =>
+ val uri = new URI(f)
+ val schemeCorrectedPath = uri.getScheme match {
+ case null | "local" => new File(f).getCanonicalFile.toURI.toString
+ case _ => f
+ }
+ new Path(schemeCorrectedPath).toUri.toString
+ }.collect {
+ case f if fileList.contains(f) => f
+ }.map(Row(_))
+ } else {
+ fileList.map(Row(_))
+ }
+ }
+}
+
+/**
+ * Returns a list of jar files that are added to resources.
+ * If jar files are provided, return the ones that are added to resources.
+ */
+case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends RunnableCommand {
+ override val output: Seq[Attribute] = {
+ AttributeReference("Results", StringType, nullable = false)() :: Nil
+ }
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val jarList = sparkSession.sparkContext.listJars()
+ if (jars.nonEmpty) {
+ for {
+ jarName <- jars.map(f => new Path(f).getName)
+ jarPath <- jarList if jarPath.contains(jarName)
+ } yield Row(jarPath)
+ } else {
+ jarList.map(Row(_))
+ }
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 33ff8aee79..7389e18aef 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -32,8 +32,7 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor}
-import org.apache.hadoop.hive.ql.processors.{CommandProcessorFactory, ResetProcessor, SetProcessor}
+import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.thrift.transport.TSocket
@@ -295,9 +294,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
System.exit(0)
}
if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
- cmd_trimmed.startsWith("!") ||
- tokens(0).toLowerCase.equals("list") ||
- isRemoteMode) {
+ cmd_trimmed.startsWith("!") || isRemoteMode) {
val start = System.currentTimeMillis()
super.processCmd(cmd)
val end = System.currentTimeMillis()
@@ -312,7 +309,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
if (proc != null) {
// scalastyle:off println
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
- proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ResetProcessor]) {
+ proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ListResourceProcessor] ||
+ proc.isInstanceOf[ResetProcessor] ) {
val driver = new SparkSQLDriver
driver.init()
diff --git a/sql/hive-thriftserver/src/test/resources/TestUDTF.jar b/sql/hive-thriftserver/src/test/resources/TestUDTF.jar
new file mode 100644
index 0000000000..514f2d5d26
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/resources/TestUDTF.jar
Binary files differ
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 3fa2f884e2..2bf0221c78 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -238,4 +238,23 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))(
"" -> "This is a test for Spark-11624")
}
+
+ test("list jars") {
+ val jarFile = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
+ runCliWithin(2.minute)(
+ s"ADD JAR $jarFile" -> "",
+ s"LIST JARS" -> "TestUDTF.jar",
+ s"List JAR $jarFile" -> "TestUDTF.jar"
+ )
+ }
+
+ test("list files") {
+ val dataFilePath = Thread.currentThread().getContextClassLoader
+ .getResource("data/files/small_kv.txt")
+ runCliWithin(2.minute)(
+ s"ADD FILE $dataFilePath" -> "",
+ s"LIST FILES" -> "small_kv.txt",
+ s"LIST FILE $dataFilePath" -> "small_kv.txt"
+ )
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index e179021491..e0f6ccf04d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -876,6 +876,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""")
sql("select * from src join t1 on src.key = t1.a")
sql("DROP TABLE t1")
+ assert(sql("list jars").
+ filter(_.getString(0).contains("hive-hcatalog-core-0.13.1.jar")).count() > 0)
+ assert(sql("list jar").
+ filter(_.getString(0).contains("hive-hcatalog-core-0.13.1.jar")).count() > 0)
+ val testJar2 = TestHive.getHiveFile("TestUDTF.jar").getCanonicalPath
+ sql(s"ADD JAR $testJar2")
+ assert(sql(s"list jar $testJar").count() == 1)
}
test("CREATE TEMPORARY FUNCTION") {
@@ -899,6 +906,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
assert(checkAddFileRDD.first())
+ assert(sql("list files").
+ filter(_.getString(0).contains("data/files/v1.txt")).count() > 0)
+ assert(sql("list file").
+ filter(_.getString(0).contains("data/files/v1.txt")).count() > 0)
+ assert(sql(s"list file $testFile").count() == 1)
}
createQueryTest("dynamic_partition",