From 7478c8b66d6a2b1179f20c38b49e27e37b0caec3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 24 Aug 2015 12:40:09 -0700 Subject: [SPARK-9791] [PACKAGE] Change private class to private class to prevent unnecessary classes from showing up in the docs In addition, some random cleanup of import ordering Author: Tathagata Das Closes #8387 from tdas/SPARK-9791 and squashes the following commits: 67f3ee9 [Tathagata Das] Change private class to private[package] class to prevent them from showing up in the docs --- .../apache/spark/streaming/flume/FlumeUtils.scala | 2 +- .../org/apache/spark/streaming/kafka/Broker.scala | 6 ++-- .../spark/streaming/kafka/KafkaTestUtils.scala | 10 +++--- .../apache/spark/streaming/kafka/KafkaUtils.scala | 36 ++++++---------------- .../apache/spark/streaming/kafka/OffsetRange.scala | 8 ----- .../apache/spark/streaming/mqtt/MQTTUtils.scala | 6 ++-- .../spark/streaming/mqtt/MQTTTestUtils.scala | 2 +- 7 files changed, 20 insertions(+), 50 deletions(-) (limited to 'external') diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 095bfb0c73..a65a9b921a 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -247,7 +247,7 @@ object FlumeUtils { * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and * function so that it can be easily instantiated and called from Python's FlumeUtils. */ -private class FlumeUtilsPythonHelper { +private[flume] class FlumeUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala index 5a74febb4b..9159051ba0 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala @@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka import org.apache.spark.annotation.Experimental /** - * :: Experimental :: - * Represent the host and port info for a Kafka broker. - * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID + * Represents the host and port info for a Kafka broker. + * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID. */ -@Experimental final class Broker private( /** Broker's hostname */ val host: String, diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index b608b75952..79a9db4291 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -20,9 +20,8 @@ package org.apache.spark.streaming.kafka import java.io.File import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.util.{Map => JMap} -import java.util.Properties import java.util.concurrent.TimeoutException +import java.util.{Map => JMap, Properties} import scala.annotation.tailrec import scala.language.postfixOps @@ -30,17 +29,16 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request -import kafka.common.TopicAndPartition import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} -import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} -import org.apache.spark.{Logging, SparkConf} import org.apache.spark.streaming.Time import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkConf} /** * This is a helper class for Kafka test suites. This has the functionality to set up @@ -48,7 +46,7 @@ import org.apache.spark.util.Utils * * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. */ -private class KafkaTestUtils extends Logging { +private[kafka] class KafkaTestUtils extends Logging { // Zookeeper related configurations private val zkHost = "localhost" diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index f3b01bd60b..388dbb8184 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -17,29 +17,25 @@ package org.apache.spark.streaming.kafka -import java.lang.{Integer => JInt} -import java.lang.{Long => JLong} -import java.util.{Map => JMap} -import java.util.{Set => JSet} -import java.util.{List => JList} +import java.lang.{Integer => JInt, Long => JLong} +import java.util.{List => JList, Map => JMap, Set => JSet} -import scala.reflect.ClassTag import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} +import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.streaming.util.WriteAheadLogUtils -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.api.java.{JavaInputDStream, JavaPairInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.streaming.util.WriteAheadLogUtils +import org.apache.spark.{SparkContext, SparkException} object KafkaUtils { /** @@ -196,7 +192,6 @@ object KafkaUtils { * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition */ - @Experimental def createRDD[ K: ClassTag, V: ClassTag, @@ -214,7 +209,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you * specify the Kafka leader to connect to (to optimize fetching) and access the message as well * as the metadata. @@ -230,7 +224,6 @@ object KafkaUtils { * in which case leaders will be looked up on the driver. * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createRDD[ K: ClassTag, V: ClassTag, @@ -268,7 +261,6 @@ object KafkaUtils { * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition */ - @Experimental def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( jsc: JavaSparkContext, keyClass: Class[K], @@ -287,7 +279,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you * specify the Kafka leader to connect to (to optimize fetching) and access the message as well * as the metadata. @@ -303,7 +294,6 @@ object KafkaUtils { * in which case leaders will be looked up on the driver. * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( jsc: JavaSparkContext, keyClass: Class[K], @@ -327,7 +317,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -357,7 +346,6 @@ object KafkaUtils { * starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, @@ -375,7 +363,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -405,7 +392,6 @@ object KafkaUtils { * to determine where the stream starts (defaults to "largest") * @param topics Names of the topics to consume */ - @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, @@ -437,7 +423,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -472,7 +457,6 @@ object KafkaUtils { * starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type */ - @Experimental def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( jssc: JavaStreamingContext, keyClass: Class[K], @@ -499,7 +483,6 @@ object KafkaUtils { } /** - * :: Experimental :: * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). @@ -533,7 +516,6 @@ object KafkaUtils { * to determine where the stream starts (defaults to "largest") * @param topics Names of the topics to consume */ - @Experimental def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]]( jssc: JavaStreamingContext, keyClass: Class[K], @@ -564,7 +546,7 @@ object KafkaUtils { * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() * takes care of known parameters instead of passing them from Python */ -private class KafkaUtilsPythonHelper { +private[kafka] class KafkaUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, kafkaParams: JMap[String, String], diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala index 2f8981d489..8a5f371494 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -19,10 +19,7 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -import org.apache.spark.annotation.Experimental - /** - * :: Experimental :: * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the * offset ranges in RDDs generated by the direct Kafka DStream (see * [[KafkaUtils.createDirectStream()]]). @@ -33,13 +30,11 @@ import org.apache.spark.annotation.Experimental * } * }}} */ -@Experimental trait HasOffsetRanges { def offsetRanges: Array[OffsetRange] } /** - * :: Experimental :: * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class * can be created with `OffsetRange.create()`. * @param topic Kafka topic name @@ -47,7 +42,6 @@ trait HasOffsetRanges { * @param fromOffset Inclusive starting offset * @param untilOffset Exclusive ending offset */ -@Experimental final class OffsetRange private( val topic: String, val partition: Int, @@ -84,10 +78,8 @@ final class OffsetRange private( } /** - * :: Experimental :: * Companion object the provides methods to create instances of [[OffsetRange]]. */ -@Experimental object OffsetRange { def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = new OffsetRange(topic, partition, fromOffset, untilOffset) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 38a1114863..7b8d56d6fa 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -21,8 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream object MQTTUtils { /** @@ -79,7 +79,7 @@ object MQTTUtils { * This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and * function so that it can be easily instantiated and called from Python's MQTTUtils. */ -private class MQTTUtilsPythonHelper { +private[mqtt] class MQTTUtilsPythonHelper { def createStream( jssc: JavaStreamingContext, diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 1a371b7008..1618e2c088 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -33,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf} /** * Share codes for Scala and Python unit tests */ -private class MQTTTestUtils extends Logging { +private[mqtt] class MQTTTestUtils extends Logging { private val persistenceDir = Utils.createTempDir() private val brokerHost = "localhost" -- cgit v1.2.3