aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-11-16 10:16:36 +0000
committerSean Owen <sowen@cloudera.com>2016-11-16 10:16:36 +0000
commit43a26899e5dd2364297eaf8985bd68367e4735a7 (patch)
tree7d6ebf44636fd8daf6ff0ecce8c15f9938c3213d /external
parent3e01f128284993f39463c0ccd902b774f57cce76 (diff)
downloadspark-43a26899e5dd2364297eaf8985bd68367e4735a7.tar.gz
spark-43a26899e5dd2364297eaf8985bd68367e4735a7.tar.bz2
spark-43a26899e5dd2364297eaf8985bd68367e4735a7.zip
[SPARK-18400][STREAMING] NPE when resharding Kinesis Stream
## What changes were proposed in this pull request? Avoid NPE in KinesisRecordProcessor when shutdown happens without successful init ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #15882 from srowen/SPARK-18400.
Diffstat (limited to 'external')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala42
1 files changed, 23 insertions, 19 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 80e0cce055..a0ccd086d9 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.Duration
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
@@ -102,27 +101,32 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
* @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
* @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
*/
- override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) {
+ override def shutdown(
+ checkpointer: IRecordProcessorCheckpointer,
+ reason: ShutdownReason): Unit = {
logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason")
- reason match {
- /*
- * TERMINATE Use Case. Checkpoint.
- * Checkpoint to indicate that all records from the shard have been drained and processed.
- * It's now OK to read from the new shards that resulted from a resharding event.
- */
- case ShutdownReason.TERMINATE =>
- receiver.removeCheckpointer(shardId, checkpointer)
+ // null if not initialized before shutdown:
+ if (shardId == null) {
+ logWarning(s"No shardId for workerId $workerId?")
+ } else {
+ reason match {
+ /*
+ * TERMINATE Use Case. Checkpoint.
+ * Checkpoint to indicate that all records from the shard have been drained and processed.
+ * It's now OK to read from the new shards that resulted from a resharding event.
+ */
+ case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId, checkpointer)
- /*
- * ZOMBIE Use Case or Unknown reason. NoOp.
- * No checkpoint because other workers may have taken over and already started processing
- * the same records.
- * This may lead to records being processed more than once.
- */
- case _ =>
- receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint
+ /*
+ * ZOMBIE Use Case or Unknown reason. NoOp.
+ * No checkpoint because other workers may have taken over and already started processing
+ * the same records.
+ * This may lead to records being processed more than once.
+ * Return null so that we don't checkpoint
+ */
+ case _ => receiver.removeCheckpointer(shardId, null)
+ }
}
-
}
}