aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 12:45:40 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 12:45:40 -0800
commit3618d70b2a8a66e9a17a7e2efc8c97a22243073a (patch)
treed2420fd43a0088e9b76db82aa7fa7092101045b0 /streaming
parentbe647191386f4c01e7502776fbdc4884b5cdaac2 (diff)
downloadspark-3618d70b2a8a66e9a17a7e2efc8c97a22243073a.tar.gz
spark-3618d70b2a8a66e9a17a7e2efc8c97a22243073a.tar.bz2
spark-3618d70b2a8a66e9a17a7e2efc8c97a22243073a.zip
Added warning if filestream adds files with no data in them (file RDDs have 0 partitions).
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala7
1 files changed, 7 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 2bb6d91d04..95224282f6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -114,6 +114,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
+ files.zip(fileRDDs).foreach { case (file, rdd) => {
+ if (rdd.partitions.size == 0) {
+ logWarning("File " + file + " has no data in it. Are you sure you are following " +
+ "the move-based method of adding input files? Refer to the programming guide " +
+ "for more details.")
+ }
+ }}
new UnionRDD(context.sparkContext, fileRDDs)
}