diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 17:47:16 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-23 17:47:16 -0800 |
commit | 0af7f84c8eb631cd2e427b692f407ec2d37dad64 (patch) | |
tree | 2a724dfe75eb9fdf17445e7228ba558214a0e780 /streaming/src | |
parent | 8ca14a1e5157a449d1fa7dc0657079ed82c3c4be (diff) | |
download | spark-0af7f84c8eb631cd2e427b692f407ec2d37dad64.tar.gz spark-0af7f84c8eb631cd2e427b692f407ec2d37dad64.tar.bz2 spark-0af7f84c8eb631cd2e427b692f407ec2d37dad64.zip |
Minor formatting fixes.
Diffstat (limited to 'streaming/src')
3 files changed, 13 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 957c227996..0f9a71983e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -81,7 +81,8 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and + // then doing moves : which should be pretty fast. val fos = fs.create(writeFile) fos.write(bytes) fos.close() @@ -132,7 +133,8 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends val startTime = System.currentTimeMillis() val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) val endTime = System.currentTimeMillis() - logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") + logInfo("CheckpointWriter executor terminated ? " + terminated + + ", waited for " + (endTime - startTime) + " ms.") } private def fs = synchronized { @@ -151,7 +153,8 @@ object CheckpointReader extends Logging { def read(path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), + new Path(path), new Path(path + ".bk")) val compressionCodec = CompressionCodec.createCodec() @@ -166,7 +169,8 @@ object CheckpointReader extends Logging { // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) val zis = compressionCodec.compressedInputStream(fis) - val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) + val ois = new ObjectInputStreamWithLoader(zis, + Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() fs.close() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 71065f98fc..6628fdcc85 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -50,8 +50,6 @@ import org.apache.hadoop.fs.Path import twitter4j.Status import twitter4j.auth.Authorization - - /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic * information (such as, cluster URL and job name) to internally create a SparkContext, it provides diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b526a43662..4a7c5cf29c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -73,7 +73,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { assert(validTime.milliseconds >= prevModTime, - "Trying to get new files for really old time [" + validTime + " < " + prevModTime) + "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]") // Find new files val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds) @@ -115,8 +115,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas attempts += 1 try { val filter = new CustomPathFilter(currentTime) - val newFiles = fs.listStatus(path, filter) - return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq) + val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) + return (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) } catch { case ioe: IOException => logWarning("Attempt " + attempts + " to get new files failed", ioe) @@ -238,10 +238,12 @@ private[streaming] object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") + // Disable slack time (i.e. set it to zero) private[streaming] def disableSlackTime() { System.setProperty("spark.streaming.fileStream.slackTime", "0") } + // Restore default value of slack time private[streaming] def restoreSlackTime() { System.clearProperty("spark.streaming.fileStream.slackTime") } |