aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-23 17:47:16 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-23 17:47:16 -0800
commit0af7f84c8eb631cd2e427b692f407ec2d37dad64 (patch)
tree2a724dfe75eb9fdf17445e7228ba558214a0e780
parent8ca14a1e5157a449d1fa7dc0657079ed82c3c4be (diff)
downloadspark-0af7f84c8eb631cd2e427b692f407ec2d37dad64.tar.gz
spark-0af7f84c8eb631cd2e427b692f407ec2d37dad64.tar.bz2
spark-0af7f84c8eb631cd2e427b692f407ec2d37dad64.zip
Minor formatting fixes.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala8
3 files changed, 13 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 957c227996..0f9a71983e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -81,7 +81,8 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
- // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+ // This is inherently thread unsafe .. so alleviating it by writing to '.new' and
+ // then doing moves : which should be pretty fast.
val fos = fs.create(writeFile)
fos.write(bytes)
fos.close()
@@ -132,7 +133,8 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
val endTime = System.currentTimeMillis()
- logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
+ logInfo("CheckpointWriter executor terminated ? " + terminated +
+ ", waited for " + (endTime - startTime) + " ms.")
}
private def fs = synchronized {
@@ -151,7 +153,8 @@ object CheckpointReader extends Logging {
def read(path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
- val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+ val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
+ new Path(path), new Path(path + ".bk"))
val compressionCodec = CompressionCodec.createCodec()
@@ -166,7 +169,8 @@ object CheckpointReader extends Logging {
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val zis = compressionCodec.compressedInputStream(fis)
- val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
+ val ois = new ObjectInputStreamWithLoader(zis,
+ Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
fs.close()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 71065f98fc..6628fdcc85 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -50,8 +50,6 @@ import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
-
-
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
* information (such as, cluster URL and job name) to internally create a SparkContext, it provides
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index b526a43662..4a7c5cf29c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -73,7 +73,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= prevModTime,
- "Trying to get new files for really old time [" + validTime + " < " + prevModTime)
+ "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
// Find new files
val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
@@ -115,8 +115,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
attempts += 1
try {
val filter = new CustomPathFilter(currentTime)
- val newFiles = fs.listStatus(path, filter)
- return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq)
+ val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+ return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
} catch {
case ioe: IOException =>
logWarning("Attempt " + attempts + " to get new files failed", ioe)
@@ -238,10 +238,12 @@ private[streaming]
object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
+ // Disable slack time (i.e. set it to zero)
private[streaming] def disableSlackTime() {
System.setProperty("spark.streaming.fileStream.slackTime", "0")
}
+ // Restore default value of slack time
private[streaming] def restoreSlackTime() {
System.clearProperty("spark.streaming.fileStream.slackTime")
}