diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-27 12:26:57 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-27 12:26:57 -0800 |
commit | 271e3237f3e2efa62a94d6936fce551a40edd65f (patch) | |
tree | 9edcf433023191ca80286acbd3e6bd5efd9bebbd /streaming/src | |
parent | 3618d70b2a8a66e9a17a7e2efc8c97a22243073a (diff) | |
download | spark-271e3237f3e2efa62a94d6936fce551a40edd65f.tar.gz spark-271e3237f3e2efa62a94d6936fce551a40edd65f.tar.bz2 spark-271e3237f3e2efa62a94d6936fce551a40edd65f.zip |
Minor changes in comments and strings to address comments in PR 289.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 14 |
1 files changed, 6 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 95224282f6..fb9eda8996 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -99,9 +99,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } /** - * Finds files which have modification timestamp <= current time. If some files are being - * deleted in the directory, then it can generate transient exceptions. Hence, multiple - * attempts are made to handle these transient exceptions. Returns 3-tuple + * Find files which have modification timestamp <= current time and return a 3-tuple of * (new files found, latest modification time among them, files with latest modification time) */ private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { @@ -116,9 +114,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) files.zip(fileRDDs).foreach { case (file, rdd) => { if (rdd.partitions.size == 0) { - logWarning("File " + file + " has no data in it. Are you sure you are following " + - "the move-based method of adding input files? Refer to the programming guide " + - "for more details.") + logError("File " + file + " has no data in it. Spark Streaming can only ingest " + + "files that have been \"moved\" to the directory assigned to the file stream. " + + "Refer to the streaming programming guide for more details.") } }} new UnionRDD(context.sparkContext, fileRDDs) @@ -181,8 +179,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } /** - * Custom PathFilter class to find new files that have modification timestamps <= current time, but have not - * been seen before (i.e. the file should not be in lastModTimeFiles) + * Custom PathFilter class to find new files that have modification timestamps <= current time, + * but have not been seen before (i.e. the file should not be in lastModTimeFiles) */ private[streaming] class CustomPathFilter(maxModTime: Long) extends PathFilter { |