aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-21 11:39:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-21 11:39:32 -0700
commit4b7ff3092c53827817079e0810563cbb0b9d0747 (patch)
tree846fc44b5ccf97d8cd9f2cda2881af17559e762d /extras
parent8730fbb47b09fcf955fe16dd03b75596db6d53b6 (diff)
downloadspark-4b7ff3092c53827817079e0810563cbb0b9d0747.tar.gz
spark-4b7ff3092c53827817079e0810563cbb0b9d0747.tar.bz2
spark-4b7ff3092c53827817079e0810563cbb0b9d0747.zip
[SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCredentials
Lack of default constructor causes deserialization to fail. This occurs only when the AWS credentials are explicitly specified through KinesisUtils. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6316 from tdas/SPARK-7787 and squashes the following commits: 248ca5c [Tathagata Das] Fixed serializability
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala5
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala30
2 files changed, 17 insertions, 18 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 90164490ef..800202e9fb 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -31,7 +31,10 @@ import org.apache.spark.util.Utils
private[kinesis]
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
- extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
+ extends AWSCredentials {
+ override def getAWSAccessKeyId: String = accessKeyId
+ override def getAWSSecretKey: String = secretKey
+}
/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 7c17ee9dce..cd19c33b90 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -20,27 +20,18 @@ import java.nio.ByteBuffer
import scala.collection.JavaConversions.seqAsJavaList
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.Milliseconds
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.TestSuiteBase
-import org.apache.spark.util.{ManualClock, Clock}
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.Matchers
-import org.scalatest.mock.MockitoSugar
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
+import org.apache.spark.util.{Clock, ManualClock, Utils}
/**
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
@@ -99,6 +90,11 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
ssc.stop()
}
+ test("check serializability of SerializableAWSCredentials") {
+ Utils.deserialize[SerializableAWSCredentials](
+ Utils.serialize(new SerializableAWSCredentials("x", "y")))
+ }
+
test("process records including store and checkpoint") {
when(receiverMock.isStopped()).thenReturn(false)
when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)