aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-08-01 04:32:46 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-01 04:32:46 -0700
commita32f0fb73a739c56208cafcd9f08618fb6dd8859 (patch)
tree30d2b0faec80084693f798047b3dc5e89971b31f /external
parent284771efbef2d6b22212afd49dd62732a2cf52a8 (diff)
downloadspark-a32f0fb73a739c56208cafcd9f08618fb6dd8859.tar.gz
spark-a32f0fb73a739c56208cafcd9f08618fb6dd8859.tar.bz2
spark-a32f0fb73a739c56208cafcd9f08618fb6dd8859.zip
[SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix reflection issue
This PR updates previous Manifest for KafkaInputDStream's Decoder to ClassTag, also fix the problem addressed in [SPARK-2103](https://issues.apache.org/jira/browse/SPARK-2103). Previous Java interface cannot actually get the type of Decoder, so when using this Manifest to reconstruct the decode object will meet reflection exception. Also for other two Java interfaces, ClassTag[String] is useless because calling Scala API will get the right implicit ClassTag. Current Kafka unit test cannot actually verify the interface. I've tested these interfaces in my local and distribute settings. Author: jerryshao <saisai.shao@intel.com> Closes #1508 from jerryshao/SPARK-2103 and squashes the following commits: e90c37b [jerryshao] Add Mima excludes 7529810 [jerryshao] Change Manifest to ClassTag for KafkaInputDStream's Decoder and fix Decoder construct issue when using Java API
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala14
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala16
2 files changed, 12 insertions, 18 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 38095e88dc..e20e2c8f26 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka
import scala.collection.Map
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
import java.util.Properties
import java.util.concurrent.Executors
@@ -48,8 +48,8 @@ private[streaming]
class KafkaInputDStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
@@ -66,8 +66,8 @@ private[streaming]
class KafkaReceiver[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
@@ -103,10 +103,10 @@ class KafkaReceiver[
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}
- val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
- val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 86bb91f362..48668f763e 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -65,7 +65,7 @@ object KafkaUtils {
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
- def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
+ def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
@@ -89,8 +89,6 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
@@ -111,8 +109,6 @@ object KafkaUtils {
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
@@ -140,13 +136,11 @@ object KafkaUtils {
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+ implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
+ implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)