diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-21 11:39:32 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-21 11:39:32 -0700 |
commit | 4b7ff3092c53827817079e0810563cbb0b9d0747 (patch) | |
tree | 846fc44b5ccf97d8cd9f2cda2881af17559e762d /extras/kinesis-asl | |
parent | 8730fbb47b09fcf955fe16dd03b75596db6d53b6 (diff) | |
download | spark-4b7ff3092c53827817079e0810563cbb0b9d0747.tar.gz spark-4b7ff3092c53827817079e0810563cbb0b9d0747.tar.bz2 spark-4b7ff3092c53827817079e0810563cbb0b9d0747.zip |
[SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCredentials
Lack of default constructor causes deserialization to fail. This occurs only when the AWS credentials are explicitly specified through KinesisUtils.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6316 from tdas/SPARK-7787 and squashes the following commits:
248ca5c [Tathagata Das] Fixed serializability
Diffstat (limited to 'extras/kinesis-asl')
2 files changed, 17 insertions, 18 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 90164490ef..800202e9fb 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -31,7 +31,10 @@ import org.apache.spark.util.Utils private[kinesis] case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) - extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable + extends AWSCredentials { + override def getAWSAccessKeyId: String = accessKeyId + override def getAWSSecretKey: String = secretKey +} /** * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index 7c17ee9dce..cd19c33b90 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -20,27 +20,18 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions.seqAsJavaList -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.Milliseconds -import org.apache.spark.streaming.Seconds -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.TestSuiteBase -import org.apache.spark.util.{ManualClock, Clock} - -import org.mockito.Mockito._ -import org.scalatest.BeforeAndAfter -import org.scalatest.Matchers -import org.scalatest.mock.MockitoSugar - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException} import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfter, Matchers} +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase} +import org.apache.spark.util.{Clock, ManualClock, Utils} /** * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor @@ -99,6 +90,11 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft ssc.stop() } + test("check serializability of SerializableAWSCredentials") { + Utils.deserialize[SerializableAWSCredentials]( + Utils.serialize(new SerializableAWSCredentials("x", "y"))) + } + test("process records including store and checkpoint") { when(receiverMock.isStopped()).thenReturn(false) when(checkpointStateMock.shouldCheckpoint()).thenReturn(true) |