aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala14
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala16
2 files changed, 12 insertions, 18 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 38095e88dc..e20e2c8f26 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka
import scala.collection.Map
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
import java.util.Properties
import java.util.concurrent.Executors
@@ -48,8 +48,8 @@ private[streaming]
class KafkaInputDStream[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
@@ -66,8 +66,8 @@ private[streaming]
class KafkaReceiver[
K: ClassTag,
V: ClassTag,
- U <: Decoder[_]: Manifest,
- T <: Decoder[_]: Manifest](
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
@@ -103,10 +103,10 @@ class KafkaReceiver[
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}
- val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
- val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 86bb91f362..48668f763e 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -65,7 +65,7 @@ object KafkaUtils {
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
- def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
+ def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
@@ -89,8 +89,6 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
@@ -111,8 +109,6 @@ object KafkaUtils {
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
@@ -140,13 +136,11 @@ object KafkaUtils {
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+ implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
+ implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)