diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-24 14:01:13 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-24 14:01:13 -0800 |
commit | d4dfab503a9222b5acf5c4bf69b91c16f298e4aa (patch) | |
tree | b04fff3dd233e23122ac8f1a0072be8bea0961b9 /streaming/src | |
parent | 9f79fd89dc84cda7ebeb98a0b43c8e982fefa787 (diff) | |
download | spark-d4dfab503a9222b5acf5c4bf69b91c16f298e4aa.tar.gz spark-d4dfab503a9222b5acf5c4bf69b91c16f298e4aa.tar.bz2 spark-d4dfab503a9222b5acf5c4bf69b91c16f298e4aa.zip |
Fixed Python API for sc.setCheckpointDir. Also other fixes based on Reynold's comments on PR 289.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 6 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 11 |
2 files changed, 9 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 4a7c5cf29c..d6514a1fb1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -123,7 +123,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas reset() } } - (Seq(), -1, Seq()) + (Seq.empty, -1, Seq.empty) } /** Generate one RDD from an array of files */ @@ -193,7 +193,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * been seen before (i.e. the file should not be in lastModTimeFiles) */ private[streaming] - class CustomPathFilter(currentTime: Long) extends PathFilter() { + class CustomPathFilter(currentTime: Long) extends PathFilter { // Latest file mod time seen in this round of fetching files and its corresponding files var latestModTime = 0L val latestModTimeFiles = new HashSet[String]() @@ -209,7 +209,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas logDebug("Rejected by filter " + path) return false } else { // Accept file only if - val modTime = fs.getFileStatus(path).getModificationTime() + val modTime = fs.getFileStatus(path).getModificationTime() logDebug("Mod time for " + path + " is " + modTime) if (modTime < prevModTime) { logDebug("Mod time less than last mod time") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 2552d51654..921a33a4cb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,16 +17,17 @@ package org.apache.spark.streaming.scheduler +import akka.actor.{Props, Actor} import org.apache.spark.SparkEnv import org.apache.spark.Logging import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} -import akka.actor.{Props, Actor} -sealed trait JobGeneratorEvent -case class GenerateJobs(time: Time) extends JobGeneratorEvent -case class ClearOldMetadata(time: Time) extends JobGeneratorEvent -case class DoCheckpoint(time: Time) extends JobGeneratorEvent +/** Event classes for JobGenerator */ +private[scheduler] sealed trait JobGeneratorEvent +private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent +private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent +private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent /** * This class generates jobs from DStreams as well as drives checkpointing and cleaning |