aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorsurq <surq@asiainfo.com>2014-11-10 17:37:16 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-10 17:37:32 -0800
commit07ba50f7eff3db68f120d979a5f0ca37cb2a886e (patch)
treeb7b751b3905e2eff5b42f85edefbdc69a0b5613d /streaming
parentf0eb0a79cc68c0f254ddf1a1bba672321c84d341 (diff)
downloadspark-07ba50f7eff3db68f120d979a5f0ca37cb2a886e.tar.gz
spark-07ba50f7eff3db68f120d979a5f0ca37cb2a886e.tar.bz2
spark-07ba50f7eff3db68f120d979a5f0ca37cb2a886e.zip
[SPARK-3954][Streaming] Optimization to FileInputDStream
about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1.files.map(...) 2.files.zip(fileRDDs) 3.files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence => only one loop Author: surq <surq@asiainfo.com> Closes #2811 from surq/SPARK-3954 and squashes the following commits: 321bbe8 [surq] updated the code style.The style from [for...yield]to [files.map(file=>{})] 88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954 178066f [surq] modify code's style. [Exceeds 100 columns] 626ef97 [surq] remove redundant import(ArrayBuffer) 739341f [surq] promote the speed of convert files to RDDS (cherry picked from commit ce6ed2abd14de26b9ceaa415e9a42fbb1338f5fa) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 8152b7542a..55d6cf6a78 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -120,14 +120,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
- val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
- files.zip(fileRDDs).foreach { case (file, rdd) => {
+ val fileRDDs = files.map(file =>{
+ val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
- }}
+ rdd
+ })
new UnionRDD(context.sparkContext, fileRDDs)
}