aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala63
2 files changed, 71 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5ae8fb81de..bae951f388 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -713,7 +713,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
RDD[(String, String)] = {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
- NewFileInputFormat.addInputPath(job, new Path(path))
+ // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
+ // comma separated files as input. (see SPARK-7155)
+ NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
@@ -759,7 +761,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
RDD[(String, PortableDataStream)] = {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
- NewFileInputFormat.addInputPath(job, new Path(path))
+ // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
+ // comma separated files as input. (see SPARK-7155)
+ NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
@@ -935,7 +939,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
- NewFileInputFormat.addInputPath(job, new Path(path))
+ // Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
+ // comma separated files as input. (see SPARK-7155)
+ NewFileInputFormat.setInputPaths(job, path)
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 728558a424..9049db7755 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -25,7 +25,9 @@ import com.google.common.io.Files
import org.scalatest.FunSuite
-import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.apache.spark.util.Utils
import scala.concurrent.Await
@@ -213,4 +215,63 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}
+
+ test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") {
+ // Regression test for SPARK-7155
+ // dir1 and dir2 are used for wholeTextFiles and binaryFiles
+ val dir1 = Utils.createTempDir()
+ val dir2 = Utils.createTempDir()
+
+ val dirpath1=dir1.getAbsolutePath
+ val dirpath2=dir2.getAbsolutePath
+
+ // file1 and file2 are placed inside dir1, they are also used for
+ // textFile, hadoopFile, and newAPIHadoopFile
+ // file3, file4 and file5 are placed inside dir2, they are used for
+ // textFile, hadoopFile, and newAPIHadoopFile as well
+ val file1 = new File(dir1, "part-00000")
+ val file2 = new File(dir1, "part-00001")
+ val file3 = new File(dir2, "part-00000")
+ val file4 = new File(dir2, "part-00001")
+ val file5 = new File(dir2, "part-00002")
+
+ val filepath1=file1.getAbsolutePath
+ val filepath2=file2.getAbsolutePath
+ val filepath3=file3.getAbsolutePath
+ val filepath4=file4.getAbsolutePath
+ val filepath5=file5.getAbsolutePath
+
+
+ try {
+ // Create 5 text files.
+ Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8)
+ Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8)
+ Files.write("someline1 in file3", file3, UTF_8)
+ Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8)
+ Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8)
+
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+
+ // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
+ assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
+ assert(sc.hadoopFile(filepath1 + "," + filepath2,
+ classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+ assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
+ classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+
+ // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
+ assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
+ assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+ classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+ assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+ classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
+
+ // Test wholeTextFiles, and binaryFiles for dir1 and dir2
+ assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
+ assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
+
+ } finally {
+ sc.stop()
+ }
+ }
}