From 69c9c177160e32a2fbc9b36ecc52156077fca6fc Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 25 Aug 2015 12:33:13 +0100 Subject: [SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen Closes #8033 from srowen/SPARK-9613. --- .../spark/streaming/kafka/KafkaTestUtils.scala | 4 +-- .../apache/spark/streaming/kafka/KafkaUtils.scala | 35 ++++++++++++---------- 2 files changed, 21 insertions(+), 18 deletions(-) (limited to 'external/kafka/src/main') diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 79a9db4291..c9fd715d3d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException import java.util.{Map => JMap, Properties} import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.language.postfixOps import scala.util.control.NonFatal @@ -159,8 +160,7 @@ private[kafka] class KafkaTestUtils extends Logging { /** Java-friendly function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { - import scala.collection.JavaConversions._ - sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*)) + sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) } /** Send the messages to the Kafka broker */ diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 388dbb8184..3128222077 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka import java.lang.{Integer => JInt, Long => JLong} import java.util.{List => JList, Map => JMap, Set => JSet} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import kafka.common.TopicAndPartition @@ -96,7 +96,7 @@ object KafkaUtils { groupId: String, topics: JMap[String, JInt] ): JavaPairReceiverInputDStream[String, String] = { - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*)) } /** @@ -115,7 +115,7 @@ object KafkaUtils { topics: JMap[String, JInt], storageLevel: StorageLevel ): JavaPairReceiverInputDStream[String, String] = { - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), storageLevel) } @@ -149,7 +149,10 @@ object KafkaUtils { implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass) createStream[K, V, U, T]( - jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + jssc.ssc, + kafkaParams.asScala.toMap, + Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), + storageLevel) } /** get leaders for the given offset ranges, or throw an exception */ @@ -275,7 +278,7 @@ object KafkaUtils { implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) new JavaPairRDD(createRDD[K, V, KD, VD]( - jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges)) } /** @@ -311,9 +314,9 @@ object KafkaUtils { implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) - val leaderMap = Map(leaders.toSeq: _*) + val leaderMap = Map(leaders.asScala.toSeq: _*) createRDD[K, V, KD, VD, R]( - jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _) + jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_)) } /** @@ -476,8 +479,8 @@ object KafkaUtils { val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) createDirectStream[K, V, KD, VD, R]( jssc.ssc, - Map(kafkaParams.toSeq: _*), - Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), + Map(kafkaParams.asScala.toSeq: _*), + Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*), cleanedHandler ) } @@ -531,8 +534,8 @@ object KafkaUtils { implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) createDirectStream[K, V, KD, VD]( jssc.ssc, - Map(kafkaParams.toSeq: _*), - Set(topics.toSeq: _*) + Map(kafkaParams.asScala.toSeq: _*), + Set(topics.asScala.toSeq: _*) ) } } @@ -602,10 +605,10 @@ private[kafka] class KafkaUtilsPythonHelper { ): JavaPairInputDStream[Array[Byte], Array[Byte]] = { if (!fromOffsets.isEmpty) { - import scala.collection.JavaConversions._ - val topicsFromOffsets = fromOffsets.keySet().map(_.topic) - if (topicsFromOffsets != topics.toSet) { - throw new IllegalStateException(s"The specified topics: ${topics.toSet.mkString(" ")} " + + val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic) + if (topicsFromOffsets != topics.asScala.toSet) { + throw new IllegalStateException( + s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " + s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}") } } @@ -663,6 +666,6 @@ private[kafka] class KafkaUtilsPythonHelper { "with this RDD, please call this method only on a Kafka RDD.") val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]] - kafkaRDD.offsetRanges.toSeq + kafkaRDD.offsetRanges.toSeq.asJava } } -- cgit v1.2.3