aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorXin Wu <xinwu@us.ibm.com>2016-05-23 17:32:01 -0700
committerCheng Lian <lian@databricks.com>2016-05-23 17:32:01 -0700
commit01659bc50cd3d53815d205d005c3678e714c08e0 (patch)
treed439b5175acd61be26237905795fc3398b1a958d /core
parenta8e97d17b91684e68290d9f18a43622232aa94e7 (diff)
downloadspark-01659bc50cd3d53815d205d005c3678e714c08e0.tar.gz
spark-01659bc50cd3d53815d205d005c3678e714c08e0.tar.bz2
spark-01659bc50cd3d53815d205d005c3678e714c08e0.zip
[SPARK-15431][SQL] Support LIST FILE(s)|JAR(s) command natively
## What changes were proposed in this pull request? Currently command `ADD FILE|JAR <filepath | jarpath>` is supported natively in SparkSQL. However, when this command is run, the file/jar is added to the resources that can not be looked up by `LIST FILE(s)|JAR(s)` command because the `LIST` command is passed to Hive command processor in Spark-SQL or simply not supported in Spark-shell. There is no way users can find out what files/jars are added to the spark context. Refer to [Hive commands](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli) This PR is to support following commands: `LIST (FILE[s] [filepath ...] | JAR[s] [jarfile ...])` ### For example: ##### LIST FILE(s) ``` scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt") res1: org.apache.spark.sql.DataFrame = [] scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt") res2: org.apache.spark.sql.DataFrame = [] scala> spark.sql("list file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt").show(false) +----------------------------------------------+ |result | +----------------------------------------------+ |hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt| +----------------------------------------------+ scala> spark.sql("list files").show(false) +----------------------------------------------+ |result | +----------------------------------------------+ |hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt| |hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt | +----------------------------------------------+ ``` ##### LIST JAR(s) ``` scala> spark.sql("add jar /Users/xinwu/spark/core/src/test/resources/TestUDTF.jar") res9: org.apache.spark.sql.DataFrame = [result: int] scala> spark.sql("list jar TestUDTF.jar").show(false) +---------------------------------------------+ |result | +---------------------------------------------+ |spark://192.168.1.234:50131/jars/TestUDTF.jar| +---------------------------------------------+ scala> spark.sql("list jars").show(false) +---------------------------------------------+ |result | +---------------------------------------------+ |spark://192.168.1.234:50131/jars/TestUDTF.jar| +---------------------------------------------+ ``` ## How was this patch tested? New test cases are added for Spark-SQL, Spark-Shell and SparkContext API code path. Author: Xin Wu <xinwu@us.ibm.com> Author: xin Wu <xinwu@us.ibm.com> Closes #13212 from xwu0226/list_command.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/test/resources/TestUDTF.jarbin0 -> 1328 bytes
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala14
3 files changed, 23 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e6cdd0d298..351024bea4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1387,6 +1387,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
+ * Returns a list of file paths that are added to resources.
+ */
+ def listFiles(): Seq[String] = addedFiles.keySet.toSeq
+
+ /**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
@@ -1724,6 +1729,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}
+ /**
+ * Returns a list of jar files that are added to resources.
+ */
+ def listJars(): Seq[String] = addedJars.keySet.toSeq
+
// Shut down the SparkContext.
def stop() {
if (LiveListenerBus.withinListenerThread.value) {
diff --git a/core/src/test/resources/TestUDTF.jar b/core/src/test/resources/TestUDTF.jar
new file mode 100644
index 0000000000..514f2d5d26
--- /dev/null
+++ b/core/src/test/resources/TestUDTF.jar
Binary files differ
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 63987084ff..ae665138b9 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
assert(byteArray2.length === 0)
}
- test("addFile works") {
+ test("basic case for addFile and listFiles") {
val dir = Utils.createTempDir()
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
@@ -156,6 +156,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
x
}).count()
+ assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
+ } finally {
+ sc.stop()
+ }
+ }
+
+ test("add and list jar files") {
+ val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
+ try {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ sc.addJar(jarPath.toString)
+ assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1)
} finally {
sc.stop()
}