aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala2
6 files changed, 35 insertions, 33 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 4808d0fcbc..444261da8d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -421,11 +421,11 @@ class StreamingContext private[streaming] (
* by "moving" them from another location within the same file system. File names
* starting with . are ignored.
*
- * '''Note:''' We ensure that the byte array for each record in the
- * resulting RDDs of the DStream has the provided record length.
- *
* @param directory HDFS directory to monitor for new file
* @param recordLength length of each record in bytes
+ *
+ * @note We ensure that the byte array for each record in the
+ * resulting RDDs of the DStream has the provided record length.
*/
def binaryRecordsStream(
directory: String,
@@ -447,12 +447,12 @@ class StreamingContext private[streaming] (
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
- * those RDDs, so `queueStream` doesn't support checkpointing.
- *
* @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
+ *
+ * @note Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
*/
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
@@ -465,14 +465,14 @@ class StreamingContext private[streaming] (
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
- * those RDDs, so `queueStream` doesn't support checkpointing.
- *
* @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
* Set as null if no RDD should be returned when empty
* @tparam T Type of objects in the RDD
+ *
+ * @note Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
*/
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index da9ff85885..aa4003c62e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -74,7 +74,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions)
- /** Method that generates a RDD for the given Duration */
+ /** Method that generates an RDD for the given Duration */
def compute(validTime: Time): JavaPairRDD[K, V] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaPairRDD(rdd)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 4c4376a089..b43b9405de 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -218,11 +218,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* for new files and reads them as flat binary files with fixed record lengths,
* yielding byte arrays
*
- * '''Note:''' We ensure that the byte array for each record in the
- * resulting RDDs of the DStream has the provided record length.
- *
* @param directory HDFS directory to monitor for new files
* @param recordLength The length at which to split the records
+ *
+ * @note We ensure that the byte array for each record in the
+ * resulting RDDs of the DStream has the provided record length.
*/
def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = {
ssc.binaryRecordsStream(directory, recordLength)
@@ -352,13 +352,13 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE:
+ * @param queue Queue of RDDs
+ * @tparam T Type of objects in the RDD
+ *
+ * @note
* 1. Changes to the queue after the stream is created will not be recognized.
* 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
- *
- * @param queue Queue of RDDs
- * @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
@@ -372,14 +372,14 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE:
- * 1. Changes to the queue after the stream is created will not be recognized.
- * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
- * those RDDs, so `queueStream` doesn't support checkpointing.
- *
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
+ *
+ * @note
+ * 1. Changes to the queue after the stream is created will not be recognized.
+ * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
+ * those RDDs, so `queueStream` doesn't support checkpointing.
*/
def queueStream[T](
queue: java.util.Queue[JavaRDD[T]],
@@ -396,7 +396,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
- * NOTE:
+ * @note
* 1. Changes to the queue after the stream is created will not be recognized.
* 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
@@ -454,9 +454,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams. The order of the JavaRDDs in the transform function parameter will be the
- * same as the order of corresponding DStreams in the list. Note that for adding a
- * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
- * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+ * same as the order of corresponding DStreams in the list.
+ *
+ * @note For adding a JavaPairDStream in the list of JavaDStreams, convert it to a
+ * JavaDStream using [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
* a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
@@ -476,9 +477,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams. The order of the JavaRDDs in the transform function parameter will be the
- * same as the order of corresponding DStreams in the list. Note that for adding a
- * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
- * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+ * same as the order of corresponding DStreams in the list.
+ *
+ * @note For adding a JavaPairDStream in the list of JavaDStreams, convert it to
+ * a JavaDStream using [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
* a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7e0a2ca609..e23edfa506 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -69,13 +69,13 @@ abstract class DStream[T: ClassTag] (
// Methods that should be implemented by subclasses of DStream
// =======================================================================
- /** Time interval after which the DStream generates a RDD */
+ /** Time interval after which the DStream generates an RDD */
def slideDuration: Duration
/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
- /** Method that generates a RDD for the given time */
+ /** Method that generates an RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]
// =======================================================================
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
index ed08191f41..9512db7d7d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala
@@ -128,7 +128,7 @@ class InternalMapWithStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: Clas
super.initialize(time)
}
- /** Method that generates a RDD for the given time */
+ /** Method that generates an RDD for the given time */
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Get the previous state or create a new empty state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index ce5a6e00fb..a37fac8730 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -186,7 +186,7 @@ class WriteAheadLogBackedBlockRDDSuite
assert(rdd.collect() === data.flatten)
// Verify that the block fetching is skipped when isBlockValid is set to false.
- // This is done by using a RDD whose data is only in memory but is set to skip block fetching
+ // This is done by using an RDD whose data is only in memory but is set to skip block fetching
// Using that RDD will throw exception, as it skips block fetching even if the blocks are in
// in BlockManager.
if (testIsBlockValid) {