aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-02-07 12:52:00 +0000
committerSean Owen <sowen@cloudera.com>2016-02-07 12:52:00 +0000
commit140ddef373680cb08a3948a883b172dc80814170 (patch)
tree37171c20cd3fda2da1655dd7245fa0b1b4682708 /external
parentbc8890b357811612ba6c10d96374902b9e08134f (diff)
downloadspark-140ddef373680cb08a3948a883b172dc80814170.tar.gz
spark-140ddef373680cb08a3948a883b172dc80814170.tar.bz2
spark-140ddef373680cb08a3948a883b172dc80814170.zip
[SPARK-10963][STREAMING][KAFKA] make KafkaCluster public
Author: cody koeninger <cody@koeninger.org> Closes #9007 from koeninger/SPARK-10963.
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala19
1 files changed, 10 insertions, 9 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index d7885d7cc1..8a66621a31 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -29,15 +29,19 @@ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, To
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
/**
+ * :: DeveloperApi ::
* Convenience methods for interacting with a Kafka cluster.
+ * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">
+ * A Guide To The Kafka Protocol</a> for more details on individual api calls.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
-private[spark]
+@DeveloperApi
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
@@ -227,7 +231,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// this 0 here indicates api version, in this case the original ZK backed api.
private def defaultConsumerApiVersion: Short = 0
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
@@ -246,7 +250,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
@@ -283,7 +287,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
Left(errs)
}
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
@@ -301,7 +305,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}
- /** Requires Kafka >= 0.8.1.1 */
+ /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata]
@@ -359,7 +363,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
-private[spark]
+@DeveloperApi
object KafkaCluster {
type Err = ArrayBuffer[Throwable]
@@ -371,7 +375,6 @@ object KafkaCluster {
)
}
- private[spark]
case class LeaderOffset(host: String, port: Int, offset: Long)
/**
@@ -379,7 +382,6 @@ object KafkaCluster {
* Simple consumers connect directly to brokers, but need many of the same configs.
* This subclass won't warn about missing ZK params, or presence of broker params.
*/
- private[spark]
class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
@@ -391,7 +393,6 @@ object KafkaCluster {
}
}
- private[spark]
object SimpleConsumerConfig {
/**
* Make a consumer config without requiring group.id or zookeeper.connect,