diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-31 02:01:45 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-31 02:01:45 -0800 |
commit | fcd17a1e8ef1d0f106e845f4de99533d61cd8695 (patch) | |
tree | 9826f029321e93f3286c4420dd290b053c1308ee /streaming | |
parent | 271e3237f3e2efa62a94d6936fce551a40edd65f (diff) | |
download | spark-fcd17a1e8ef1d0f106e845f4de99533d61cd8695.tar.gz spark-fcd17a1e8ef1d0f106e845f4de99533d61cd8695.tar.bz2 spark-fcd17a1e8ef1d0f106e845f4de99533d61cd8695.zip |
Fixed comments and long lines based on comments on PR 289.
Diffstat (limited to 'streaming')
3 files changed, 17 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 0f9a71983e..4960a85b97 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -81,8 +81,8 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe .. so alleviating it by writing to '.new' and - // then doing moves : which should be pretty fast. + // This is inherently thread unsafe, so alleviating it by writing to '.new' and + // then moving it to the final file val fos = fs.create(writeFile) fos.write(bytes) fos.close() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6628fdcc85..8898fdcb7f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -371,7 +371,8 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. - * File names starting with . are ignored. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -390,6 +391,8 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory @@ -410,7 +413,9 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). File names starting with . are ignored. + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): DStream[String] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 78d318cf27..aad0d931e7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -256,9 +256,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). File names starting with . are ignored. + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): JavaDStream[String] = { @@ -300,9 +302,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. - * File names starting with . are ignored. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -331,7 +334,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** - * Creates a input stream from a Flume source. + * Create a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ |