aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-06-30 13:16:58 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-30 13:16:58 -0700
commitc62263340edb6976a10f274e716fde6cd2c5bf34 (patch)
tree32a0446677686976ecba2757209f37699af58b9b /external
parent46395db80e3304e3f3a1ebdc8aadb8f2819b48b4 (diff)
downloadspark-c62263340edb6976a10f274e716fde6cd2c5bf34.tar.gz
spark-c62263340edb6976a10f274e716fde6cd2c5bf34.tar.bz2
spark-c62263340edb6976a10f274e716fde6cd2c5bf34.zip
[SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback
## What changes were proposed in this pull request? code cleanup in kafka-0-8 to match suggested changes for kafka-0-10 branch ## How was this patch tested? unit tests Author: cody koeninger <cody@koeninger.org> Closes #13908 from koeninger/kafka-0-8-cleanup.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala12
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala9
-rw-r--r--external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java5
3 files changed, 12 insertions, 14 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index fb58ed7898..c3c799375b 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator
/**
- * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * A stream of [[KafkaRDD]] where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
@@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
* and this DStream is not responsible for committing offsets,
* so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets,
- * see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ * see [[KafkaCluster]]
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
@@ -132,7 +132,7 @@ class DirectKafkaInputDStream[
if (retries <= 0) {
throw new SparkException(err)
} else {
- log.error(err)
+ logError(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
@@ -194,7 +194,7 @@ class DirectKafkaInputDStream[
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
}
- override def update(time: Time) {
+ override def update(time: Time): Unit = {
batchForTime.clear()
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
@@ -202,9 +202,9 @@ class DirectKafkaInputDStream[
}
}
- override def cleanup(time: Time) { }
+ override def cleanup(time: Time): Unit = { }
- override def restore() {
+ override def restore(): Unit = {
// this is assuming that the topics don't change during execution, which is true currently
val topics = fromOffsets.keySet
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index d4881b140d..2b925774a2 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -129,7 +129,7 @@ class KafkaRDD[
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
- log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
+ logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
@@ -137,13 +137,16 @@ class KafkaRDD[
}
}
+ /**
+ * An iterator that fetches messages directly from Kafka for the offsets in partition.
+ */
private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] {
context.addTaskCompletionListener{ context => closeIfNeeded() }
- log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+ logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
val kc = new KafkaCluster(kafkaParams)
@@ -177,7 +180,7 @@ class KafkaRDD[
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
- log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
+ logError(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index fa6b0dbc8c..71404a7331 100644
--- a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -135,11 +135,6 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
@Override
public void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect());
- for (OffsetRange o : offsetRanges.get()) {
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
- );
- }
}
}
);