aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
committerSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
commit69c9c177160e32a2fbc9b36ecc52156077fca6fc (patch)
tree57345aaf19c3149038bfca5c4ddccf33d41bdd5b /external/kafka
parent7f1e507bf7e82bff323c5dec3c1ee044687c4173 (diff)
downloadspark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.gz
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.bz2
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.zip
[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen <sowen@cloudera.com> Closes #8033 from srowen/SPARK-9613.
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala35
2 files changed, 21 insertions, 18 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 79a9db4291..c9fd715d3d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException
import java.util.{Map => JMap, Properties}
import scala.annotation.tailrec
+import scala.collection.JavaConverters._
import scala.language.postfixOps
import scala.util.control.NonFatal
@@ -159,8 +160,7 @@ private[kafka] class KafkaTestUtils extends Logging {
/** Java-friendly function for sending messages to the Kafka broker */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
- import scala.collection.JavaConversions._
- sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*))
+ sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
}
/** Send the messages to the Kafka broker */
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 388dbb8184..3128222077 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka
import java.lang.{Integer => JInt, Long => JLong}
import java.util.{List => JList, Map => JMap, Set => JSet}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import kafka.common.TopicAndPartition
@@ -96,7 +96,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
- createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
}
/**
@@ -115,7 +115,7 @@ object KafkaUtils {
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
- createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
@@ -149,7 +149,10 @@ object KafkaUtils {
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
createStream[K, V, U, T](
- jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ jssc.ssc,
+ kafkaParams.asScala.toMap,
+ Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
}
/** get leaders for the given offset ranges, or throw an exception */
@@ -275,7 +278,7 @@ object KafkaUtils {
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
new JavaPairRDD(createRDD[K, V, KD, VD](
- jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
}
/**
@@ -311,9 +314,9 @@ object KafkaUtils {
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
- val leaderMap = Map(leaders.toSeq: _*)
+ val leaderMap = Map(leaders.asScala.toSeq: _*)
createRDD[K, V, KD, VD, R](
- jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _)
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_))
}
/**
@@ -476,8 +479,8 @@ object KafkaUtils {
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
- Map(kafkaParams.toSeq: _*),
- Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
+ Map(kafkaParams.asScala.toSeq: _*),
+ Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
cleanedHandler
)
}
@@ -531,8 +534,8 @@ object KafkaUtils {
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
createDirectStream[K, V, KD, VD](
jssc.ssc,
- Map(kafkaParams.toSeq: _*),
- Set(topics.toSeq: _*)
+ Map(kafkaParams.asScala.toSeq: _*),
+ Set(topics.asScala.toSeq: _*)
)
}
}
@@ -602,10 +605,10 @@ private[kafka] class KafkaUtilsPythonHelper {
): JavaPairInputDStream[Array[Byte], Array[Byte]] = {
if (!fromOffsets.isEmpty) {
- import scala.collection.JavaConversions._
- val topicsFromOffsets = fromOffsets.keySet().map(_.topic)
- if (topicsFromOffsets != topics.toSet) {
- throw new IllegalStateException(s"The specified topics: ${topics.toSet.mkString(" ")} " +
+ val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
+ if (topicsFromOffsets != topics.asScala.toSet) {
+ throw new IllegalStateException(
+ s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
}
}
@@ -663,6 +666,6 @@ private[kafka] class KafkaUtilsPythonHelper {
"with this RDD, please call this method only on a Kafka RDD.")
val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
- kafkaRDD.offsetRanges.toSeq
+ kafkaRDD.offsetRanges.toSeq.asJava
}
}