From 3fc6cfd079d8cdd35574605cb9a4178ca7f2613d Mon Sep 17 00:00:00 2001 From: yongtang Date: Wed, 29 Apr 2015 23:55:51 +0100 Subject: [SPARK-7155] [CORE] Allow newAPIHadoopFile to support comma-separated list of files as input See JIRA: https://issues.apache.org/jira/browse/SPARK-7155 SparkContext's newAPIHadoopFile() does not support comma-separated list of files. For example, the following: ```scala sc.newAPIHadoopFile("/root/file1.txt,/root/file2.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) ``` will throw ``` org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/root/file1.txt,/root/file2.txt ``` However, the other API hadoopFile() is able to process comma-separated list of files correctly. In addition, since sc.textFile() uses hadoopFile(), it is also able to process comma-separated list of files correctly. That means the behaviors of hadoopFile() and newAPIHadoopFile() are not aligned. This pull request fix this issue and allows newAPIHadoopFile() to support comma-separated list of files as input. A unit test has also been added in SparkContextSuite.scala. It creates two temporary text files as the input and tested against sc.textFile(), sc.hadoopFile(), and sc.newAPIHadoopFile(). Note: The contribution is my original work and that I license the work to the project under the project's open source license. Author: yongtang Closes #5708 from yongtang/SPARK-7155 and squashes the following commits: 654c80c [yongtang] [SPARK-7155] [CORE] Remove unneeded temp file deletion in unit test as parent dir is already temporary. 26faa6a [yongtang] [SPARK-7155] [CORE] Support comma-separated list of files as input for newAPIHadoopFile, wholeTextFiles, and binaryFiles. Use setInputPaths for consistency. 73e1f16 [yongtang] [SPARK-7155] [CORE] Allow newAPIHadoopFile to support comma-separated list of files as input. --- .../main/scala/org/apache/spark/SparkContext.scala | 12 +++-- .../scala/org/apache/spark/SparkContextSuite.scala | 63 +++++++++++++++++++++- 2 files changed, 71 insertions(+), 4 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5ae8fb81de..bae951f388 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -713,7 +713,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli RDD[(String, String)] = { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) - NewFileInputFormat.addInputPath(job, new Path(path)) + // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking + // comma separated files as input. (see SPARK-7155) + NewFileInputFormat.setInputPaths(job, path) val updateConf = job.getConfiguration new WholeTextFileRDD( this, @@ -759,7 +761,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli RDD[(String, PortableDataStream)] = { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) - NewFileInputFormat.addInputPath(job, new Path(path)) + // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking + // comma separated files as input. (see SPARK-7155) + NewFileInputFormat.setInputPaths(job, path) val updateConf = job.getConfiguration new BinaryFileRDD( this, @@ -935,7 +939,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves val job = new NewHadoopJob(conf) - NewFileInputFormat.addInputPath(job, new Path(path)) + // Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking + // comma separated files as input. (see SPARK-7155) + NewFileInputFormat.setInputPaths(job, path) val updatedConf = job.getConfiguration new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 728558a424..9049db7755 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -25,7 +25,9 @@ import com.google.common.io.Files import org.scalatest.FunSuite -import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.apache.spark.util.Utils import scala.concurrent.Await @@ -213,4 +215,63 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { sc.stop() } } + + test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") { + // Regression test for SPARK-7155 + // dir1 and dir2 are used for wholeTextFiles and binaryFiles + val dir1 = Utils.createTempDir() + val dir2 = Utils.createTempDir() + + val dirpath1=dir1.getAbsolutePath + val dirpath2=dir2.getAbsolutePath + + // file1 and file2 are placed inside dir1, they are also used for + // textFile, hadoopFile, and newAPIHadoopFile + // file3, file4 and file5 are placed inside dir2, they are used for + // textFile, hadoopFile, and newAPIHadoopFile as well + val file1 = new File(dir1, "part-00000") + val file2 = new File(dir1, "part-00001") + val file3 = new File(dir2, "part-00000") + val file4 = new File(dir2, "part-00001") + val file5 = new File(dir2, "part-00002") + + val filepath1=file1.getAbsolutePath + val filepath2=file2.getAbsolutePath + val filepath3=file3.getAbsolutePath + val filepath4=file4.getAbsolutePath + val filepath5=file5.getAbsolutePath + + + try { + // Create 5 text files. + Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8) + Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8) + Files.write("someline1 in file3", file3, UTF_8) + Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8) + Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8) + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + + // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2 + assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L) + assert(sc.hadoopFile(filepath1 + "," + filepath2, + classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2, + classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + + // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5 + assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L) + assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5, + classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5, + classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) + + // Test wholeTextFiles, and binaryFiles for dir1 and dir2 + assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L) + assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L) + + } finally { + sc.stop() + } + } } -- cgit v1.2.3