aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-31 00:47:56 -0700
committerReynold Xin <rxin@databricks.com>2015-05-31 00:47:56 -0700
commit564bc11e9827915c8652bc06f4bd591809dea4b1 (patch)
tree1a3fe6ff9f78bef3d991897effe554098018a1b4 /extras
parent74fdc97c7206c6d715f128ef7c46055e0bb90760 (diff)
downloadspark-564bc11e9827915c8652bc06f4bd591809dea4b1.tar.gz
spark-564bc11e9827915c8652bc06f4bd591809dea4b1.tar.bz2
spark-564bc11e9827915c8652bc06f4bd591809dea4b1.zip
[SPARK-3850] Trim trailing spaces for examples/streaming/yarn.
Author: Reynold Xin <rxin@databricks.com> Closes #6530 from rxin/trim-whitespace-1 and squashes the following commits: 7b7b3a0 [Reynold Xin] Reset again. dc14597 [Reynold Xin] Reset scalastyle. cd556c4 [Reynold Xin] YARN, Kinesis, Flume. 4223fe1 [Reynold Xin] [SPARK-3850] Trim trailing spaces for examples/streaming.
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala12
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala8
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala8
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala28
4 files changed, 28 insertions, 28 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index 97c3476049..be8b62d3cc 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -119,7 +119,7 @@ object KinesisWordCountASL extends Logging {
val batchInterval = Milliseconds(2000)
// Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
- // on sequence number of records that have been received. Same as batchInterval for this
+ // on sequence number of records that have been received. Same as batchInterval for this
// example.
val kinesisCheckpointInterval = batchInterval
@@ -145,7 +145,7 @@ object KinesisWordCountASL extends Logging {
// Map each word to a (word, 1) tuple so we can reduce by key to count the words
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
-
+
// Print the first 10 wordCounts
wordCounts.print()
@@ -210,14 +210,14 @@ object KinesisWordProducerASL {
val randomWords = List("spark", "you", "are", "my", "father")
val totals = scala.collection.mutable.Map[String, Int]()
-
+
// Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)
println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record")
-
+
// Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
for (i <- 1 to 10) {
// Generate recordsPerSec records to put onto the stream
@@ -255,8 +255,8 @@ object KinesisWordProducerASL {
}
}
-/**
- * Utility functions for Spark Streaming examples.
+/**
+ * Utility functions for Spark Streaming examples.
* This has been lifted from the examples/ project to remove the circular dependency.
*/
private[streaming] object StreamingExamples extends Logging {
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 1c9b0c218a..83a4537559 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -23,20 +23,20 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock}
/**
* This is a helper class for managing checkpoint clocks.
*
- * @param checkpointInterval
+ * @param checkpointInterval
* @param currentClock. Default to current SystemClock if none is passed in (mocking purposes)
*/
private[kinesis] class KinesisCheckpointState(
- checkpointInterval: Duration,
+ checkpointInterval: Duration,
currentClock: Clock = new SystemClock())
extends Logging {
-
+
/* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
val checkpointClock = new ManualClock()
checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds)
/**
- * Check if it's time to checkpoint based on the current time and the derived time
+ * Check if it's time to checkpoint based on the current time and the derived time
* for the next checkpoint
*
* @return true if it's time to checkpoint
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 7dd8bfdc2a..1a8a4cecc1 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -44,12 +44,12 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
* https://github.com/awslabs/amazon-kinesis-client
* This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * Instances of this class will get shipped to the Spark Streaming Workers to run within a
+ * Instances of this class will get shipped to the Spark Streaming Workers to run within a
* Spark Executor.
*
* @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
- * the KCL will throw errors. This usually requires deleting the backing
+ * the KCL will throw errors. This usually requires deleting the backing
* DynamoDB table with the same name this Kinesis application.
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
@@ -87,7 +87,7 @@ private[kinesis] class KinesisReceiver(
*/
/**
- * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
+ * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
* where this code runs (not the driver's IP address.)
*/
private var workerId: String = null
@@ -121,7 +121,7 @@ private[kinesis] class KinesisReceiver(
/*
* RecordProcessorFactory creates impls of IRecordProcessor.
- * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
+ * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
* IRecordProcessor.processRecords() method.
* We're using our custom KinesisRecordProcessor in this case.
*/
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index f65e743c4e..fe9e3a0c79 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -35,9 +35,9 @@ import com.amazonaws.services.kinesis.model.Record
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
- * shard in the Kinesis stream upon startup. This is normally done in separate threads,
- * but the KCLs within the KinesisReceivers will balance themselves out if you create
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
+ * shard in the Kinesis stream upon startup. This is normally done in separate threads,
+ * but the KCLs within the KinesisReceivers will balance themselves out if you create
* multiple Receivers.
*
* @param receiver Kinesis receiver
@@ -69,14 +69,14 @@ private[kinesis] class KinesisRecordProcessor(
* and Spark Streaming's Receiver.store().
*
* @param batch list of records from the Kinesis stream shard
- * @param checkpointer used to update Kinesis when this batch has been processed/stored
+ * @param checkpointer used to update Kinesis when this batch has been processed/stored
* in the DStream
*/
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
if (!receiver.isStopped()) {
try {
/*
- * Notes:
+ * Notes:
* 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
* internally-configured Spark serializer (kryo, etc).
@@ -84,19 +84,19 @@ private[kinesis] class KinesisRecordProcessor(
* ourselves from Spark's internal serialization strategy.
* 3) For performance, the BlockGenerator is asynchronously queuing elements within its
* memory before creating blocks. This prevents the small block scenario, but requires
- * that you register callbacks to know when a block has been generated and stored
+ * that you register callbacks to know when a block has been generated and stored
* (WAL is sufficient for storage) before can checkpoint back to the source.
*/
batch.foreach(record => receiver.store(record.getData().array()))
-
+
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
/*
- * Checkpoint the sequence number of the last record successfully processed/stored
+ * Checkpoint the sequence number of the last record successfully processed/stored
* in the batch.
* In this implementation, we're checkpointing after the given checkpointIntervalMillis.
- * Note that this logic requires that processRecords() be called AND that it's time to
- * checkpoint. I point this out because there is no background thread running the
+ * Note that this logic requires that processRecords() be called AND that it's time to
+ * checkpoint. I point this out because there is no background thread running the
* checkpointer. Checkpointing is tested and trigger only when a new batch comes in.
* If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
* However, if the worker dies unexpectedly, a checkpoint may not happen.
@@ -130,16 +130,16 @@ private[kinesis] class KinesisRecordProcessor(
}
} else {
/* RecordProcessor has been stopped. */
- logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" +
+ logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" +
s" and shardId $shardId. No more records will be processed.")
}
}
/**
* Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
- * 1) the stream is resharding by splitting or merging adjacent shards
+ * 1) the stream is resharding by splitting or merging adjacent shards
* (ShutdownReason.TERMINATE)
- * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
+ * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
* (ShutdownReason.ZOMBIE)
*
* @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
@@ -153,7 +153,7 @@ private[kinesis] class KinesisRecordProcessor(
* Checkpoint to indicate that all records from the shard have been drained and processed.
* It's now OK to read from the new shards that resulted from a resharding event.
*/
- case ShutdownReason.TERMINATE =>
+ case ShutdownReason.TERMINATE =>
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
/*