diff options
author | Xin Wu <xinwu@us.ibm.com> | 2016-05-23 17:32:01 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-05-23 17:32:01 -0700 |
commit | 01659bc50cd3d53815d205d005c3678e714c08e0 (patch) | |
tree | d439b5175acd61be26237905795fc3398b1a958d /core | |
parent | a8e97d17b91684e68290d9f18a43622232aa94e7 (diff) | |
download | spark-01659bc50cd3d53815d205d005c3678e714c08e0.tar.gz spark-01659bc50cd3d53815d205d005c3678e714c08e0.tar.bz2 spark-01659bc50cd3d53815d205d005c3678e714c08e0.zip |
[SPARK-15431][SQL] Support LIST FILE(s)|JAR(s) command natively
## What changes were proposed in this pull request?
Currently command `ADD FILE|JAR <filepath | jarpath>` is supported natively in SparkSQL. However, when this command is run, the file/jar is added to the resources that can not be looked up by `LIST FILE(s)|JAR(s)` command because the `LIST` command is passed to Hive command processor in Spark-SQL or simply not supported in Spark-shell. There is no way users can find out what files/jars are added to the spark context.
Refer to [Hive commands](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli)
This PR is to support following commands:
`LIST (FILE[s] [filepath ...] | JAR[s] [jarfile ...])`
### For example:
##### LIST FILE(s)
```
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt")
res2: org.apache.spark.sql.DataFrame = []
scala> spark.sql("list file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt").show(false)
+----------------------------------------------+
|result |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
+----------------------------------------------+
scala> spark.sql("list files").show(false)
+----------------------------------------------+
|result |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt |
+----------------------------------------------+
```
##### LIST JAR(s)
```
scala> spark.sql("add jar /Users/xinwu/spark/core/src/test/resources/TestUDTF.jar")
res9: org.apache.spark.sql.DataFrame = [result: int]
scala> spark.sql("list jar TestUDTF.jar").show(false)
+---------------------------------------------+
|result |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+
scala> spark.sql("list jars").show(false)
+---------------------------------------------+
|result |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+
```
## How was this patch tested?
New test cases are added for Spark-SQL, Spark-Shell and SparkContext API code path.
Author: Xin Wu <xinwu@us.ibm.com>
Author: xin Wu <xinwu@us.ibm.com>
Closes #13212 from xwu0226/list_command.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 10 | ||||
-rw-r--r-- | core/src/test/resources/TestUDTF.jar | bin | 0 -> 1328 bytes | |||
-rw-r--r-- | core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 14 |
3 files changed, 23 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e6cdd0d298..351024bea4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1387,6 +1387,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** + * Returns a list of file paths that are added to resources. + */ + def listFiles(): Seq[String] = addedFiles.keySet.toSeq + + /** * Add a file to be downloaded with this Spark job on every node. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, @@ -1724,6 +1729,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } + /** + * Returns a list of jar files that are added to resources. + */ + def listJars(): Seq[String] = addedJars.keySet.toSeq + // Shut down the SparkContext. def stop() { if (LiveListenerBus.withinListenerThread.value) { diff --git a/core/src/test/resources/TestUDTF.jar b/core/src/test/resources/TestUDTF.jar Binary files differnew file mode 100644 index 0000000000..514f2d5d26 --- /dev/null +++ b/core/src/test/resources/TestUDTF.jar diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 63987084ff..ae665138b9 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { assert(byteArray2.length === 0) } - test("addFile works") { + test("basic case for addFile and listFiles") { val dir = Utils.createTempDir() val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) @@ -156,6 +156,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } x }).count() + assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) + } finally { + sc.stop() + } + } + + test("add and list jar files") { + val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar") + try { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addJar(jarPath.toString) + assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1) } finally { sc.stop() } |