diff options
Diffstat (limited to 'streaming')
5 files changed, 6 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/State.scala b/streaming/src/main/scala/org/apache/spark/streaming/State.scala index 3f560f889f..23cf48eb06 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/State.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/State.scala @@ -178,7 +178,7 @@ private[streaming] class StateImpl[S] extends State[S] { removed } - /** Whether the state has been been updated */ + /** Whether the state has been updated */ def isUpdated(): Boolean = { updated } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index a3c125c306..9a760e2947 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -88,7 +88,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) if (!super.isTimeValid(time)) { false // Time not valid } else { - // Time is valid, but check it it is more than lastValidTime + // Time is valid, but check it is more than lastValidTime if (lastValidTime != null && time < lastValidTime) { logWarning(s"isTimeValid called with $time whereas the last valid time " + s"is $lastValidTime") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala index e8c814ba71..9b6bc71c7a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala @@ -326,7 +326,7 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B // Create a MapWithStateRDD that has a long lineage using the data RDD with a long lineage val stateRDDWithLongLineage = makeStateRDDWithLongLineageDataRDD(longLineageRDD) - // Create a new MapWithStateRDD, with the lineage lineage MapWithStateRDD as the parent + // Create a new MapWithStateRDD, with the lineage MapWithStateRDD as the parent new MapWithStateRDD[Int, Int, Int, Int]( stateRDDWithLongLineage, stateRDDWithLongLineage.sparkContext.emptyRDD[(Int, Int)].partitionBy(partitioner), diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index a37fac8730..c5e695a33a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -108,7 +108,7 @@ class WriteAheadLogBackedBlockRDDSuite /** * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager - * and the rest to a write ahead log, and then reading reading it all back using the RDD. + * and the rest to a write ahead log, and then reading it all back using the RDD. * It can also test if the partitions that were read from the log were again stored in * block manager. * diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index a1d0561bf3..b70383ecde 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -90,7 +90,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (data1) assert(listener.onAddDataCalled === false) // should be called only with addDataWithCallback() - // Verify addDataWithCallback() add data+metadata and and callbacks are called correctly + // Verify addDataWithCallback() add data+metadata and callbacks are called correctly val data2 = 11 to 20 val metadata2 = data2.map { _.toString } data2.zip(metadata2).foreach { case (d, m) => blockGenerator.addDataWithCallback(d, m) } @@ -103,7 +103,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined } - // Verify addMultipleDataWithCallback() add data+metadata and and callbacks are called correctly + // Verify addMultipleDataWithCallback() add data+metadata and callbacks are called correctly val data3 = 21 to 30 val metadata3 = "metadata" blockGenerator.addMultipleDataWithCallback(data3.iterator, metadata3) |