aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-05-01 17:54:56 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-01 17:54:56 -0700
commit4786484076865c56c3fc23c49819b9be2933d287 (patch)
tree32d44db8cd54dcc1a7a13c0501a71b5618cf0e3a /external
parentb88c275e6ef6b17cd34d1c2c780b8959b41222c0 (diff)
downloadspark-4786484076865c56c3fc23c49819b9be2933d287.tar.gz
spark-4786484076865c56c3fc23c49819b9be2933d287.tar.bz2
spark-4786484076865c56c3fc23c49819b9be2933d287.zip
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
i don't think this should be merged until after 1.3.0 is final Author: cody koeninger <cody@koeninger.org> Author: Helena Edelson <helena.edelson@datastax.com> Closes #4537 from koeninger/wip-2808-kafka-0.8.2-upgrade and squashes the following commits: 803aa2c [cody koeninger] [SPARK-2808][Streaming][Kafka] code cleanup per TD e6dfaf6 [cody koeninger] [SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger jenkins again 1770abc [cody koeninger] [SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to call, call it from python tests as well d4267e9 [cody koeninger] [SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script 30d991d [cody koeninger] [SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks python 3 syntax 1d896e2 [cody koeninger] [SPARK-2808][Streaming][Kafka] add even even more logging to python test 4c4557f [cody koeninger] [SPARK-2808][Streaming][Kafka] add even more logging to python test 115aeee [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade 2712649 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more logging to python test, see why its timing out in jenkins 2b92d3f [cody koeninger] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well 3824ce3 [cody koeninger] [SPARK-2808][Streaming][Kafka] naming / comments per tdas 61b3464 [cody koeninger] [SPARK-2808][Streaming][Kafka] delay for second send in boundary condition test af6f3ec [cody koeninger] [SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value 9edab4c [cody koeninger] [SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins failing test c70ee43 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more asserts to test, try to figure out why it fails on jenkins but not locally 1d10751 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade ed02d2c [cody koeninger] [SPARK-2808][Streaming][Kafka] move default argument for api version to overloaded method, for binary compat 407382e [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1 77de6c2 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade 6953429 [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2 2e67c66 [Helena Edelson] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta. d9dc2bc [Helena Edelson] Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.8.2-upgrade e768164 [Helena Edelson] #2808 update kafka to version 0.8.2
Diffstat (limited to 'external')
-rw-r--r--external/kafka/pom.xml2
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala51
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala35
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java3
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala32
5 files changed, 99 insertions, 24 deletions
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index f695cff410..243ce6eaca 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.1.1</version>
+ <version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index bd767031c1..6cf254a7b6 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -20,9 +20,10 @@ package org.apache.spark.streaming.kafka
import scala.util.control.NonFatal
import scala.util.Random
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import java.util.Properties
import kafka.api._
-import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException
@@ -220,12 +221,22 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
// scalastyle:on
+ // this 0 here indicates api version, in this case the original ZK backed api.
+ private def defaultConsumerApiVersion: Short = 0
+
/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, Long]] =
+ getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsets(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Long]] = {
- getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
r.map { kv =>
kv._1 -> kv._2.offset
}
@@ -236,9 +247,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
+ ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
+ getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+ def getConsumerOffsetMetadata(
+ groupId: String,
+ topicAndPartitions: Set[TopicAndPartition],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
- val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
+ val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.fetchOffsets(req)
@@ -266,24 +284,39 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
+
+ def setConsumerOffsets(
+ groupId: String,
+ offsets: Map[TopicAndPartition, Long],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
- setConsumerOffsetMetadata(groupId, offsets.map { kv =>
- kv._1 -> OffsetMetadataAndError(kv._2)
- })
+ val meta = offsets.map { kv =>
+ kv._1 -> OffsetAndMetadata(kv._2)
+ }
+ setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}
/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsetMetadata(
groupId: String,
- metadata: Map[TopicAndPartition, OffsetMetadataAndError]
+ metadata: Map[TopicAndPartition, OffsetAndMetadata]
+ ): Either[Err, Map[TopicAndPartition, Short]] =
+ setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
+
+ def setConsumerOffsetMetadata(
+ groupId: String,
+ metadata: Map[TopicAndPartition, OffsetAndMetadata],
+ consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
- val req = OffsetCommitRequest(groupId, metadata)
+ val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.commitOffsets(req)
- val respMap = resp.requestInfo
+ val respMap = resp.commitStatus
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { err: Short =>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 13e9475065..6dc4e9517d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -29,10 +29,12 @@ import scala.language.postfixOps
import scala.util.control.NonFatal
import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.common.TopicAndPartition
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.ZKStringSerializer
+import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.I0Itec.zkclient.ZkClient
@@ -227,12 +229,35 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}
- private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ /** Wait until the leader offset for the given topic/partition equals the specified offset */
+ def waitUntilLeaderOffset(
+ topic: String,
+ partition: Int,
+ offset: Long): Unit = {
eventually(Time(10000), Time(100)) {
+ val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
+ val tp = TopicAndPartition(topic, partition)
+ val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
assert(
- server.apis.metadataCache.containsTopicAndPartition(topic, partition),
- s"Partition [$topic, $partition] metadata not propagated after timeout"
- )
+ llo == offset,
+ s"$topic $partition $offset not reached after timeout")
+ }
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.size >= 1
+
+ case _ =>
+ false
+ }
+ eventually(Time(10000), Time(100)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index a9dc6e5061..5cf3796353 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -72,6 +72,9 @@ public class JavaKafkaRDDSuite implements Serializable {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
+ kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
+ kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);
+
OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 7d26ce5087..39c3fb448f 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -53,14 +53,15 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
}
test("basic usage") {
- val topic = "topicbasic"
+ val topic = s"topicbasic-${Random.nextInt}"
kafkaTestUtils.createTopic(topic)
val messages = Set("the", "quick", "brown", "fox")
kafkaTestUtils.sendMessages(topic, messages.toArray)
-
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+ "group.id" -> s"test-consumer-${Random.nextInt}")
+
+ kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
@@ -73,27 +74,38 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
- val topic = "topic1"
+ val topic = s"topicboundary-${Random.nextInt}"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+ "group.id" -> s"test-consumer-${Random.nextInt}")
val kc = new KafkaCluster(kafkaParams)
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
+ val sentCount = sent.values.sum
+ kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
+
// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))
assert(rdd.isDefined)
- assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")
- val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
- .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+ val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
+ val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
- kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
+ assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+ assert(rdd.get.count === sentCount, "didn't get all sent messages")
+
+ val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+ // make sure consumer offsets are committed before the next getRdd call
+ kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
+ err => throw new Exception(err.mkString("\n")),
+ _ => ()
+ )
// this is the "0 messages" case
val rdd2 = getRdd(kc, Set(topic))
@@ -101,6 +113,8 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
val sentOnlyOne = Map("d" -> 1)
kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+ kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)
+
assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")