diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 12:33:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 12:33:12 -0800 |
commit | be647191386f4c01e7502776fbdc4884b5cdaac2 (patch) | |
tree | 05dea5eb9948af13b4b10ff7a470be534c1681d3 /streaming | |
parent | bacc65cf28b9f95b129e9adede43f684f2c5ced3 (diff) | |
download | spark-be647191386f4c01e7502776fbdc4884b5cdaac2.tar.gz spark-be647191386f4c01e7502776fbdc4884b5cdaac2.tar.bz2 spark-be647191386f4c01e7502776fbdc4884b5cdaac2.zip |
Changed file stream to not catch any exceptions related to finding new files (FileNotFound exception is still caught and ignored).
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 30 |
1 files changed, 11 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b163b13a09..2bb6d91d04 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,18 +17,17 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} - +import java.io.{ObjectInputStream, IOException} +import scala.collection.mutable.{HashSet, HashMap} +import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.UnionRDD +import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} -import scala.collection.mutable.{HashSet, HashMap} -import scala.reflect.ClassTag - -import java.io.{ObjectInputStream, IOException} private[streaming] class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @@ -106,17 +105,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * (new files found, latest modification time among them, files with latest modification time) */ private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { - try { - logDebug("Trying to get new files for time " + currentTime) - val filter = new CustomPathFilter(currentTime) - val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) - return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) - } catch { - case e: Exception => - logError("Attempt to get new files failed", e) - reset() - } - (Seq.empty, -1, Seq.empty) + logDebug("Trying to get new files for time " + currentTime) + val filter = new CustomPathFilter(currentTime) + val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) + (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) } /** Generate one RDD from an array of files */ |