aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java2
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala2
-rw-r--r--docs/ml-features.md2
-rw-r--r--docs/mllib-statistics.md2
-rw-r--r--docs/structured-streaming-kafka-integration.md2
-rw-r--r--docs/structured-streaming-programming-guide.md2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java2
-rw-r--r--examples/src/main/python/ml/lda_example.py2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala2
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala2
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala2
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala2
-rw-r--r--python/pyspark/ml/clustering.py2
-rw-r--r--python/pyspark/ml/linalg/__init__.py4
-rw-r--r--python/pyspark/sql/utils.py2
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala2
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala2
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java2
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java2
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java2
-rw-r--r--sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java2
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/State.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala4
52 files changed, 57 insertions, 57 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 5b42843717..f219c5605b 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -86,7 +86,7 @@ public final class UnsafeInMemorySorter {
private final PrefixComparators.RadixSortSupport radixSortSupport;
/**
- * Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at
+ * Within this buffer, position {@code 2 * i} holds a pointer to the record at
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
*
* Only part of the array will be used to store the pointers, the rest part is preserved as
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
index 430bf677ed..d9f84d10e9 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
@@ -25,7 +25,7 @@ import org.apache.spark.util.collection.SortDataFormat;
* Supports sorting an array of (record pointer, key prefix) pairs.
* Used in {@link UnsafeInMemorySorter}.
* <p>
- * Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at
+ * Within each long[] buffer, position {@code 2 * i} holds a pointer to the record at
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
*/
public final class UnsafeSortDataFormat
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6f5c31d7ab..4ca442b629 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -317,7 +317,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
pool
}
- // Make sure that that we aren't going to exceed the max RPC message size by making sure
+ // Make sure that we aren't going to exceed the max RPC message size by making sure
// we use broadcast to send large map output statuses.
if (minSizeForBroadcast > maxRpcMessageSize) {
val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " +
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 7745387dbc..8c1b5f7bf0 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -98,7 +98,7 @@ case class FetchFailed(
* 4 task failures, instead we immediately go back to the stage which generated the map output,
* and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since
* presumably its not the fault of the executor where the task ran, but the executor which
- * stored the data. This is especially important because we we might rack up a bunch of
+ * stored the data. This is especially important because we might rack up a bunch of
* fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
*/
override def countTowardsTaskFailures: Boolean = false
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 79f4d06c84..320af5cf97 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -43,7 +43,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* Execute using
* ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
*
- * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
+ * Make sure that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* *and* SPARK_JAVA_OPTS:
* - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=172.17.42.1:2181
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 2b00a4a6b3..54f39f7620 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -291,7 +291,7 @@ object HistoryServer extends Logging {
/**
* Create a security manager.
- * This turns off security in the SecurityManager, so that the the History Server can start
+ * This turns off security in the SecurityManager, so that the History Server can start
* in a Spark cluster where security is enabled.
* @param config configuration for the SecurityManager constructor
* @return the security manager for use in constructing the History Server.
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index f7a991770d..8dd1a1ea05 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -92,7 +92,7 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
/**
- * Resets the value of the current metrics (`this`) and and merges all the independent
+ * Resets the value of the current metrics (`this`) and merges all the independent
* [[TempShuffleReadMetrics]] into `this`.
*/
private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index dc70eb82d2..3d4ea3cccc 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
/**
- * A BlockTransferService that uses Netty to fetch a set of blocks at at time.
+ * A BlockTransferService that uses Netty to fetch a set of blocks at time.
*/
private[spark] class NettyBlockTransferService(
conf: SparkConf,
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 386fdfd218..3bfdf95db8 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -350,7 +350,7 @@ object SizeEstimator extends Logging {
// 3. consistent fields layouts throughout the hierarchy: This means we should layout
// superclass first. And we can use superclass's shellSize as a starting point to layout the
// other fields in this class.
- // 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize not 4 bytes, confirmed
+ // 4. class alignment: HotSpot rounds field blocks up to HeapOopSize not 4 bytes, confirmed
// with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
//
// The real world field layout is much more complicated. There are three kinds of fields
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index e3304be792..7998e3702c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -253,7 +253,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
assertNotFound(appId, None)
}
- test("Test that if an attempt ID is is set, it must be used in lookups") {
+ test("Test that if an attempt ID is set, it must be used in lookups") {
val operations = new StubCacheOperations()
val clock = new ManualClock(1)
implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 5e8a854e46..f3d3f701af 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1819,7 +1819,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA")))
- // Reducer should run where RDD 2 has preferences, even though though it also has a shuffle dep
+ // Reducer should run where RDD 2 has preferences, even though it also has a shuffle dep
val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(Seq("hostB")))
complete(reduceTaskSet, Seq((Success, 42)))
@@ -2058,7 +2058,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
- assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA
+ assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on hostA
runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 7f0838268a..c8b6a3346a 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -53,7 +53,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
conf
}
- test("single insert insert") {
+ test("single insert") {
val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
val map = createExternalMap[Int]
diff --git a/docs/ml-features.md b/docs/ml-features.md
index 1d3449746c..d67fce3c95 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -752,7 +752,7 @@ for more details on the API.
`Interaction` is a `Transformer` which takes vector or double-valued columns, and generates a single vector column that contains the product of all combinations of one value from each input column.
-For example, if you have 2 vector type columns each of which has 3 dimensions as input columns, then then you'll get a 9-dimensional vector as the output column.
+For example, if you have 2 vector type columns each of which has 3 dimensions as input columns, then you'll get a 9-dimensional vector as the output column.
**Examples**
diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 12797bd868..430c069045 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -354,7 +354,7 @@ v = u.map(lambda x: 1.0 + 2.0 * x)
useful for visualizing empirical probability distributions without requiring assumptions about the
particular distribution that the observed samples are drawn from. It computes an estimate of the
probability density function of a random variables, evaluated at a given set of points. It achieves
-this estimate by expressing the PDF of the empirical distribution at a particular point as the the
+this estimate by expressing the PDF of the empirical distribution at a particular point as the
mean of PDFs of normal distributions centered around each of the samples.
<div class="codetabs">
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 2458bb5ffa..9b82e8e744 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -244,7 +244,7 @@ Note that the following Kafka params cannot be set and the Kafka source will thr
- **group.id**: Kafka source will create a unique group id for each query automatically.
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
- than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new
+ than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
Streaming query is started, and that resuming will always pick up from where the query left off.
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 799f636505..6cd050e4f2 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -680,7 +680,7 @@ windowedCounts = words.groupBy(
### Handling Late Data and Watermarking
Now consider what happens if one of the events arrives late to the application.
-For example, say, a word generated at 12:04 (i.e. event time) could be received received by
+For example, say, a word generated at 12:04 (i.e. event time) could be received by
the application at 12:11. The application should use the time 12:04 instead of 12:11
to update the older counts for the window `12:00 - 12:10`. This occurs
naturally in our window-based grouping – Structured Streaming can maintain the intermediate state
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java
index 9041244279..0e5d00565b 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java
@@ -52,7 +52,7 @@ public class JavaLDAExample {
double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
- System.out.println("The upper bound bound on perplexity: " + lp);
+ System.out.println("The upper bound on perplexity: " + lp);
// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
diff --git a/examples/src/main/python/ml/lda_example.py b/examples/src/main/python/ml/lda_example.py
index 2dc1742ff7..a8b346f72c 100644
--- a/examples/src/main/python/ml/lda_example.py
+++ b/examples/src/main/python/ml/lda_example.py
@@ -46,7 +46,7 @@ if __name__ == "__main__":
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
- print("The upper bound bound on perplexity: " + str(lp))
+ print("The upper bound on perplexity: " + str(lp))
# Describe topics.
topics = model.describeTopics(3)
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala
index 22b3b0e3ad..4215d37cb5 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala
@@ -50,7 +50,7 @@ object LDAExample {
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
- println(s"The upper bound bound on perplexity: $lp")
+ println(s"The upper bound on perplexity: $lp")
// Describe topics.
val topics = model.describeTopics(3)
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 41f27e9376..e5b63aa1a7 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -45,7 +45,7 @@ import org.apache.flume.sink.AbstractSink
* the thread itself is blocked and a reference to it saved off.
*
* When the ack for that batch is received,
- * the thread which created the transaction is is retrieved and it commits the transaction with the
+ * the thread which created the transaction is retrieved and it commits the transaction with the
* channel from the same thread it was originally created in (since Flume transactions are
* thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
* is received within the specified timeout, the transaction is rolled back too. If an ack comes
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index aa01238f91..ff9965b854 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -212,7 +212,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
|Instead set the source option '$STARTING_OFFSETS_OPTION_KEY' to 'earliest' or 'latest'
|to specify where to start. Structured Streaming manages which offsets are consumed
|internally, rather than relying on the kafkaConsumer to do it. This will ensure that no
- |data is missed when when new topics/partitions are dynamically subscribed. Note that
+ |data is missed when new topics/partitions are dynamically subscribed. Note that
|'$STARTING_OFFSETS_OPTION_KEY' only applies when a new Streaming query is started, and
|that resuming will always pick up from where the query left off. See the docs for more
|details.
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index a4d81a6809..18a5a1509a 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -129,7 +129,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
/**
* Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
- * and the rest to a write ahead log, and then reading reading it all back using the RDD.
+ * and the rest to a write ahead log, and then reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
*
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 583e5e0928..728a883b1a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -512,7 +512,7 @@ abstract class LDAModel private[ml] (
}
/**
- * Calculate an upper bound bound on perplexity. (Lower is better.)
+ * Calculate an upper bound on perplexity. (Lower is better.)
* See Equation (16) in the Online LDA paper (Hoffman et al., 2010).
*
* WARNING: If this model is an instance of [[DistributedLDAModel]] (produced when [[optimizer]]
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
index 5cbfbff3e4..4d6520d0b2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
@@ -54,7 +54,7 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) {
}
/**
- * Finds words similar to the the vector representation of a word without
+ * Finds words similar to the vector representation of a word without
* filtering results.
* @param vector a vector
* @param num number of synonyms to find
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 25ffd8561f..933a5f1d52 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -245,7 +245,7 @@ class LocalLDAModel private[spark] (
}
/**
- * Calculate an upper bound bound on perplexity. (Lower is better.)
+ * Calculate an upper bound on perplexity. (Lower is better.)
* See Equation (16) in original Online LDA paper.
*
* @param documents test corpus to use for calculating perplexity
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
index 46deb545af..f44c8fe351 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.dstream.DStream
/**
* :: DeveloperApi ::
* StreamingLinearAlgorithm implements methods for continuously
- * training a generalized linear model model on streaming data,
+ * training a generalized linear model on streaming data,
* and using it for prediction on (possibly different) streaming data.
*
* This class takes as type parameters a GeneralizedLinearModel,
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 35d0aefa04..54510e0bea 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -699,7 +699,7 @@ class LDAModel(JavaModel):
@since("2.0.0")
def logPerplexity(self, dataset):
"""
- Calculate an upper bound bound on perplexity. (Lower is better.)
+ Calculate an upper bound on perplexity. (Lower is better.)
See Equation (16) in the Online LDA paper (Hoffman et al., 2010).
WARNING: If this model is an instance of :py:class:`DistributedLDAModel` (produced when
diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py
index 1705c156ce..b765343251 100644
--- a/python/pyspark/ml/linalg/__init__.py
+++ b/python/pyspark/ml/linalg/__init__.py
@@ -481,7 +481,7 @@ class SparseVector(Vector):
>>> SparseVector(4, {1:1.0, 6:2.0})
Traceback (most recent call last):
...
- AssertionError: Index 6 is out of the the size of vector with size=4
+ AssertionError: Index 6 is out of the size of vector with size=4
>>> SparseVector(4, {-1:1.0})
Traceback (most recent call last):
...
@@ -521,7 +521,7 @@ class SparseVector(Vector):
if self.indices.size > 0:
assert np.max(self.indices) < self.size, \
- "Index %d is out of the the size of vector with size=%d" \
+ "Index %d is out of the size of vector with size=%d" \
% (np.max(self.indices), self.size)
assert np.min(self.indices) >= 0, \
"Contains negative index %d" % (np.min(self.indices))
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 2a85ec01bc..7bc6a59ad3 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -95,7 +95,7 @@ def install_exception_handler():
original = py4j.protocol.get_return_value
# The original `get_return_value` is not patched, it's idempotent.
patched = capture_sql_exception(original)
- # only patch the one used in in py4j.java_gateway (call Java API)
+ # only patch the one used in py4j.java_gateway (call Java API)
py4j.java_gateway.get_return_value = patched
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 8772e26f43..fb2d61f621 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -32,7 +32,7 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
/**
* This strategy is calculating the optimal locality preferences of YARN containers by considering
- * the node ratio of pending tasks, number of required cores/containers and and locality of current
+ * the node ratio of pending tasks, number of required cores/containers and locality of current
* existing and pending allocated containers. The target of this algorithm is to maximize the number
* of tasks that would run locally.
*
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index d205547698..86de90984c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -196,7 +196,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
assertIndexIsValid(i);
BitSetMethods.set(baseObject, baseOffset, i);
// To preserve row equality, zero out the value when setting the column to null.
- // Since this row does does not currently support updates to variable-length values, we don't
+ // Since this row does not currently support updates to variable-length values, we don't
// have to worry about zeroing out that data.
Platform.putLong(baseObject, getFieldOffset(i), 0);
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 13115f4728..07d294b108 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -516,7 +516,7 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
* into the number of buckets); both variables are based on the size of the current partition.
* During the calculation process the function keeps track of the current row number, the current
* bucket number, and the row number at which the bucket will change (bucketThreshold). When the
- * current row number reaches bucket threshold, the bucket value is increased by one and the the
+ * current row number reaches bucket threshold, the bucket value is increased by one and the
* threshold is increased by the bucket size (plus one extra if the current bucket is padded).
*
* This documentation has been based upon similar documentation for the Hive and Presto projects.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index d583fa31b0..c977e788b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -795,7 +795,7 @@ case object OneRowRelation extends LeafNode {
/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
- * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+ * cardinality is the product of all child plan's cardinality, i.e. applies in the case
* of cartesian joins.
*
* [[LeafNode]]s must override this.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 235ca8d263..a96a3b7af2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -142,7 +142,7 @@ object DateTimeUtils {
}
/**
- * Returns the number of days since epoch from from java.sql.Date.
+ * Returns the number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
millisToDays(date.getTime)
@@ -503,7 +503,7 @@ object DateTimeUtils {
}
/**
- * Calculates the year and and the number of the day in the year for the given
+ * Calculates the year and the number of the day in the year for the given
* number of days. The given days is the number of days since 1.1.1970.
*
* The calculation uses the fact that the period 1.1.2001 until 31.12.2400 is
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala
index 97cfb5f06d..273f95f91e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala
@@ -52,7 +52,7 @@ class AttributeSetSuite extends SparkFunSuite {
assert((aSet ++ bSet).contains(aLower) === true)
}
- test("extracts all references references") {
+ test("extracts all references ") {
val addSet = AttributeSet(Add(aUpper, Alias(bUpper, "test")()):: Nil)
assert(addSet.contains(aUpper))
assert(addSet.contains(aLower))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c1cedd8541..2a06f3c47c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -361,7 +361,7 @@ class Dataset[T] private[sql](
* method used to map columns depend on the type of `U`:
* - When `U` is a class, fields for the class will be mapped to columns of the same name
* (case sensitivity is determined by `spark.sql.caseSensitive`).
- * - When `U` is a tuple, the columns will be be mapped by ordinal (i.e. the first column will
+ * - When `U` is a tuple, the columns will be mapped by ordinal (i.e. the first column will
* be assigned to `_1`).
* - When `U` is a primitive type (i.e. String, Int, etc), then the first column of the
* `DataFrame` will be used.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index bc290702dc..bad59961ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -41,7 +41,7 @@ object PartitionPath {
}
/**
- * Holds a directory in a partitioned collection of files as well as as the partition values
+ * Holds a directory in a partitioned collection of files as well as the partition values
* in the form of a Row. Before scanning, the files at `path` need to be enumerated.
*/
case class PartitionPath(values: InternalRow, path: Path)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 0ce47b152c..0b39965b31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -285,7 +285,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
/**
* Starts the execution of the streaming query, which will continually send results to the given
- * `ForeachWriter` as as new data arrives. The `ForeachWriter` can be used to send the data
+ * `ForeachWriter` as new data arrives. The `ForeachWriter` can be used to send the data
* generated by the `DataFrame`/`Dataset` to an external system.
*
* Scala example:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index 4296ec543e..22d5c47a6f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -257,7 +257,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
}
}
- test("time window in SQL with with two expressions") {
+ test("time window in SQL with two expressions") {
withTempTable { table =>
checkAnswer(
spark.sql(
@@ -272,7 +272,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
}
}
- test("time window in SQL with with three expressions") {
+ test("time window in SQL with three expressions") {
withTempTable { table =>
checkAnswer(
spark.sql(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 375da224aa..0bfc92fdb6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -363,7 +363,7 @@ class PlannerSuite extends SharedSQLContext {
// This is a regression test for SPARK-9703
test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") {
// Consider an operator that imposes both output distribution and ordering requirements on its
- // children, such as sort sort merge join. If the distribution requirements are satisfied but
+ // children, such as sort merge join. If the distribution requirements are satisfied but
// the output ordering requirements are unsatisfied, then the planner should only add sorts and
// should not need to add additional shuffles / exchanges.
val outputOrdering = Seq(SortOrder(Literal(1), Ascending))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 22f59f63d6..f67444fbc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -144,7 +144,7 @@ class FileStreamSinkSuite extends StreamTest {
}
// This tests whether FileStreamSink works with aggregations. Specifically, it tests
- // whether the the correct streaming QueryExecution (i.e. IncrementalExecution) is used to
+ // whether the correct streaming QueryExecution (i.e. IncrementalExecution) is used to
// to execute the trigger for writing data to file sink. See SPARK-18440 for more details.
test("writing with aggregation") {
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java
index c2a2b2d478..9dd0efc039 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/AbstractService.java
@@ -151,7 +151,7 @@ public abstract class AbstractService implements Service {
}
/**
- * Verify that that a service is in a given state.
+ * Verify that a service is in a given state.
*
* @param currentState
* the desired state
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java
index 8946219d85..a2c580d6ac 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceOperations.java
@@ -33,7 +33,7 @@ public final class ServiceOperations {
}
/**
- * Verify that that a service is in a given state.
+ * Verify that a service is in a given state.
* @param state the actual state a service is in
* @param expectedState the desired state
* @throws IllegalStateException if the service state is different from
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java
index d1aadad04c..a1ff10dc2b 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/ServiceStateChangeListener.java
@@ -29,7 +29,7 @@ public interface ServiceStateChangeListener {
* have changed state before this callback is invoked.
*
* This operation is invoked on the thread that initiated the state change,
- * while the service itself in in a synchronized section.
+ * while the service itself in a synchronized section.
* <ol>
* <li>Any long-lived operation here will prevent the service state
* change from completing in a timely manner.</li>
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java
index 562b3f5e67..b80fd67884 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/TypeDescriptor.java
@@ -98,7 +98,7 @@ public class TypeDescriptor {
* For datetime types this is the length in characters of the String representation
* (assuming the maximum allowed precision of the fractional seconds component).
* For binary data this is the length in bytes.
- * Null is returned for for data types where the column size is not applicable.
+ * Null is returned for data types where the column size is not applicable.
*/
public Integer getColumnSize() {
if (type.isNumericType()) {
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 5cd4935e22..d217e9b4fe 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -178,7 +178,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"skewjoin",
"database",
- // These tests fail and and exit the JVM.
+ // These tests fail and exit the JVM.
"auto_join18_multi_distinct",
"join18_multi_distinct",
"input44",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index c80695bd3e..7ee5fc543c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
* The Hive table scan operator. Column and partition pruning are both handled.
*
* @param requestedAttributes Attributes to be fetched from the Hive table.
- * @param relation The Hive table be be scanned.
+ * @param relation The Hive table be scanned.
* @param partitionPruningPred An optional partition pruning predicate for partitioned table.
*/
private[hive]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/State.scala b/streaming/src/main/scala/org/apache/spark/streaming/State.scala
index 3f560f889f..23cf48eb06 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/State.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/State.scala
@@ -178,7 +178,7 @@ private[streaming] class StateImpl[S] extends State[S] {
removed
}
- /** Whether the state has been been updated */
+ /** Whether the state has been updated */
def isUpdated(): Boolean = {
updated
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index a3c125c306..9a760e2947 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -88,7 +88,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
if (!super.isTimeValid(time)) {
false // Time not valid
} else {
- // Time is valid, but check it it is more than lastValidTime
+ // Time is valid, but check it is more than lastValidTime
if (lastValidTime != null && time < lastValidTime) {
logWarning(s"isTimeValid called with $time whereas the last valid time " +
s"is $lastValidTime")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
index e8c814ba71..9b6bc71c7a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
@@ -326,7 +326,7 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
// Create a MapWithStateRDD that has a long lineage using the data RDD with a long lineage
val stateRDDWithLongLineage = makeStateRDDWithLongLineageDataRDD(longLineageRDD)
- // Create a new MapWithStateRDD, with the lineage lineage MapWithStateRDD as the parent
+ // Create a new MapWithStateRDD, with the lineage MapWithStateRDD as the parent
new MapWithStateRDD[Int, Int, Int, Int](
stateRDDWithLongLineage,
stateRDDWithLongLineage.sparkContext.emptyRDD[(Int, Int)].partitionBy(partitioner),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index a37fac8730..c5e695a33a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -108,7 +108,7 @@ class WriteAheadLogBackedBlockRDDSuite
/**
* Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
- * and the rest to a write ahead log, and then reading reading it all back using the RDD.
+ * and the rest to a write ahead log, and then reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
*
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
index a1d0561bf3..b70383ecde 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
@@ -90,7 +90,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (data1)
assert(listener.onAddDataCalled === false) // should be called only with addDataWithCallback()
- // Verify addDataWithCallback() add data+metadata and and callbacks are called correctly
+ // Verify addDataWithCallback() add data+metadata and callbacks are called correctly
val data2 = 11 to 20
val metadata2 = data2.map { _.toString }
data2.zip(metadata2).foreach { case (d, m) => blockGenerator.addDataWithCallback(d, m) }
@@ -103,7 +103,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined
}
- // Verify addMultipleDataWithCallback() add data+metadata and and callbacks are called correctly
+ // Verify addMultipleDataWithCallback() add data+metadata and callbacks are called correctly
val data3 = 21 to 30
val metadata3 = "metadata"
blockGenerator.addMultipleDataWithCallback(data3.iterator, metadata3)