From be647191386f4c01e7502776fbdc4884b5cdaac2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 26 Dec 2013 12:33:12 -0800 Subject: Changed file stream to not catch any exceptions related to finding new files (FileNotFound exception is still caught and ignored). --- .../spark/streaming/dstream/FileInputDStream.scala | 30 ++++++++-------------- 1 file changed, 11 insertions(+), 19 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b163b13a09..2bb6d91d04 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,18 +17,17 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} - +import java.io.{ObjectInputStream, IOException} +import scala.collection.mutable.{HashSet, HashMap} +import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.UnionRDD +import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} -import scala.collection.mutable.{HashSet, HashMap} -import scala.reflect.ClassTag - -import java.io.{ObjectInputStream, IOException} private[streaming] class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @@ -106,17 +105,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * (new files found, latest modification time among them, files with latest modification time) */ private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { - try { - logDebug("Trying to get new files for time " + currentTime) - val filter = new CustomPathFilter(currentTime) - val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) - return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) - } catch { - case e: Exception => - logError("Attempt to get new files failed", e) - reset() - } - (Seq.empty, -1, Seq.empty) + logDebug("Trying to get new files for time " + currentTime) + val filter = new CustomPathFilter(currentTime) + val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) + (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) } /** Generate one RDD from an array of files */ -- cgit v1.2.3