aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-12-01 10:21:31 -0800
committerMichael Armbrust <michael@databricks.com>2015-12-01 10:21:31 -0800
commit69dbe6b40df35d488d4ee343098ac70d00bbdafb (patch)
treea9e966eeb648874ba5c09d4c5855828d928d8f15 /streaming/src/main
parent1401166576c7018c5f9c31e0a6703d5fb16ea339 (diff)
downloadspark-69dbe6b40df35d488d4ee343098ac70d00bbdafb.tar.gz
spark-69dbe6b40df35d488d4ee343098ac70d00bbdafb.tar.bz2
spark-69dbe6b40df35d488d4ee343098ac70d00bbdafb.zip
[SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues
This PR backports PR #10039 to master Author: Cheng Lian <lian@databricks.com> Closes #10063 from liancheng/spark-12046.doc-fix.master.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala19
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala45
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala8
6 files changed, 60 insertions, 52 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index aee172a4f5..6fb8ad38ab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -574,11 +574,12 @@ class StreamingContext private[streaming] (
* :: DeveloperApi ::
*
* Return the current state of the context. The context can be in three possible states -
- * - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
- * Input DStreams, transformations and output operations can be created on the context.
- * - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
- * Input DStreams, transformations and output operations cannot be created on the context.
- * - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
+ *
+ * - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
+ * Input DStreams, transformations and output operations can be created on the context.
+ * - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
+ * Input DStreams, transformations and output operations cannot be created on the context.
+ * - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
*/
@DeveloperApi
def getState(): StreamingContextState = synchronized {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 40208a6486..cb5b1f252e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -42,6 +42,7 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
* class remembers the information about the files selected in past batches for
* a certain duration (say, "remember window") as shown in the figure below.
*
+ * {{{
* |<----- remember window ----->|
* ignore threshold --->| |<--- current batch time
* |____.____.____.____.____.____|
@@ -49,6 +50,7 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
* ---------------------|----|----|----|----|----|----|-----------------------> Time
* |____|____|____|____|____|____|
* remembered batches
+ * }}}
*
* The trailing end of the window is the "ignore threshold" and all files whose mod times
* are less than this threshold are assumed to have already been selected and are therefore
@@ -59,14 +61,15 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
* `isNewFile` for more details.
*
* This makes some assumptions from the underlying file system that the system is monitoring.
- * - The clock of the file system is assumed to synchronized with the clock of the machine running
- * the streaming app.
- * - If a file is to be visible in the directory listings, it must be visible within a certain
- * duration of the mod time of the file. This duration is the "remember window", which is set to
- * 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be
- * selected as the mod time will be less than the ignore threshold when it becomes visible.
- * - Once a file is visible, the mod time cannot change. If it does due to appends, then the
- * processing semantics are undefined.
+ *
+ * - The clock of the file system is assumed to synchronized with the clock of the machine running
+ * the streaming app.
+ * - If a file is to be visible in the directory listings, it must be visible within a certain
+ * duration of the mod time of the file. This duration is the "remember window", which is set to
+ * 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be
+ * selected as the mod time will be less than the ignore threshold when it becomes visible.
+ * - Once a file is visible, the mod time cannot change. If it does due to appends, then the
+ * processing semantics are undefined.
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 421d60ae35..cc7c04bfc9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -84,13 +84,14 @@ private[streaming] class BlockGenerator(
/**
* The BlockGenerator can be in 5 possible states, in the order as follows.
- * - Initialized: Nothing has been started
- * - Active: start() has been called, and it is generating blocks on added data.
- * - StoppedAddingData: stop() has been called, the adding of data has been stopped,
- * but blocks are still being generated and pushed.
- * - StoppedGeneratingBlocks: Generating of blocks has been stopped, but
- * they are still being pushed.
- * - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.
+ *
+ * - Initialized: Nothing has been started
+ * - Active: start() has been called, and it is generating blocks on added data.
+ * - StoppedAddingData: stop() has been called, the adding of data has been stopped,
+ * but blocks are still being generated and pushed.
+ * - StoppedGeneratingBlocks: Generating of blocks has been stopped, but
+ * they are still being pushed.
+ * - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.
*/
private object GeneratorState extends Enumeration {
type GeneratorState = Value
@@ -125,9 +126,10 @@ private[streaming] class BlockGenerator(
/**
* Stop everything in the right order such that all the data added is pushed out correctly.
- * - First, stop adding data to the current buffer.
- * - Second, stop generating blocks.
- * - Finally, wait for queue of to-be-pushed blocks to be drained.
+ *
+ * - First, stop adding data to the current buffer.
+ * - Second, stop generating blocks.
+ * - Finally, wait for queue of to-be-pushed blocks to be drained.
*/
def stop(): Unit = {
// Set the state to stop adding data
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index 234bc8660d..391a461f08 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -27,28 +27,29 @@ import org.apache.spark.streaming.receiver.Receiver
* A class that tries to schedule receivers with evenly distributed. There are two phases for
* scheduling receivers.
*
- * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
- * all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
- * It will try to schedule receivers such that they are evenly distributed. ReceiverTracker should
- * update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`.
- * `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list
- * that contains the scheduled locations. Then when a receiver is starting, it will send a
- * register request and `ReceiverTracker.registerReceiver` will be called. In
- * `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should check
- * if the location of this receiver is one of the scheduled locations, if not, the register will
- * be rejected.
- * - The second phase is local scheduling when a receiver is restarting. There are two cases of
- * receiver restarting:
- * - If a receiver is restarting because it's rejected due to the real location and the scheduled
- * locations mismatching, in other words, it fails to start in one of the locations that
- * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
- * still alive in the list of scheduled locations, then use them to launch the receiver job.
- * - If a receiver is restarting without a scheduled locations list, or the executors in the list
- * are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
- * not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it should clear
- * it. Then when this receiver is registering, we can know this is a local scheduling, and
- * `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
- * location is matching.
+ * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
+ * all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
+ * It will try to schedule receivers such that they are evenly distributed. ReceiverTracker
+ * should update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`.
+ * `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list
+ * that contains the scheduled locations. Then when a receiver is starting, it will send a
+ * register request and `ReceiverTracker.registerReceiver` will be called. In
+ * `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should
+ * check if the location of this receiver is one of the scheduled locations, if not, the register
+ * will be rejected.
+ * - The second phase is local scheduling when a receiver is restarting. There are two cases of
+ * receiver restarting:
+ * - If a receiver is restarting because it's rejected due to the real location and the scheduled
+ * locations mismatching, in other words, it fails to start in one of the locations that
+ * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that
+ * are still alive in the list of scheduled locations, then use them to launch the receiver
+ * job.
+ * - If a receiver is restarting without a scheduled locations list, or the executors in the list
+ * are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker`
+ * should not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it
+ * should clear it. Then when this receiver is registering, we can know this is a local
+ * scheduling, and `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if
+ * the launching location is matching.
*
* In conclusion, we should make a global schedule, try to achieve that exactly as long as possible,
* otherwise do local scheduling.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index f5165f7c39..a99b570835 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -34,9 +34,10 @@ import org.apache.spark.{Logging, SparkConf}
/**
* This class manages write ahead log files.
- * - Writes records (bytebuffers) to periodically rotating log files.
- * - Recovers the log files and the reads the recovered records upon failures.
- * - Cleans up old log files.
+ *
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon failures.
+ * - Cleans up old log files.
*
* Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write
* and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index 0148cb51c6..bfb5361405 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -72,10 +72,10 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
/**
* Stop the timer, and return the last time the callback was made.
- * - interruptTimer = true will interrupt the callback
- * if it is in progress (not guaranteed to give correct time in this case).
- * - interruptTimer = false guarantees that there will be at least one callback after `stop` has
- * been called.
+ *
+ * @param interruptTimer True will interrupt the callback if it is in progress (not guaranteed to
+ * give correct time in this case). False guarantees that there will be at
+ * least one callback after `stop` has been called.
*/
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {