aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 12:33:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 12:33:12 -0800
commitbe647191386f4c01e7502776fbdc4884b5cdaac2 (patch)
tree05dea5eb9948af13b4b10ff7a470be534c1681d3 /streaming
parentbacc65cf28b9f95b129e9adede43f684f2c5ced3 (diff)
downloadspark-be647191386f4c01e7502776fbdc4884b5cdaac2.tar.gz
spark-be647191386f4c01e7502776fbdc4884b5cdaac2.tar.bz2
spark-be647191386f4c01e7502776fbdc4884b5cdaac2.zip
Changed file stream to not catch any exceptions related to finding new files (FileNotFound exception is still caught and ignored).
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala30
1 files changed, 11 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index b163b13a09..2bb6d91d04 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,18 +17,17 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
-
+import java.io.{ObjectInputStream, IOException}
+import scala.collection.mutable.{HashSet, HashMap}
+import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
-import scala.collection.mutable.{HashSet, HashMap}
-import scala.reflect.ClassTag
-
-import java.io.{ObjectInputStream, IOException}
private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@@ -106,17 +105,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* (new files found, latest modification time among them, files with latest modification time)
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
- try {
- logDebug("Trying to get new files for time " + currentTime)
- val filter = new CustomPathFilter(currentTime)
- val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
- return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
- } catch {
- case e: Exception =>
- logError("Attempt to get new files failed", e)
- reset()
- }
- (Seq.empty, -1, Seq.empty)
+ logDebug("Trying to get new files for time " + currentTime)
+ val filter = new CustomPathFilter(currentTime)
+ val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+ (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
}
/** Generate one RDD from an array of files */