aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql
diff options
context:
space:
mode:
authorRoberto Agostino Vitillo <ra.vitillo@gmail.com>2017-02-17 11:43:57 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-17 11:44:18 -0800
commit1a3f5f8c55d873aaf8145a8bc4867fc9902cf93d (patch)
tree495c962a3981a854258453f92c7417fd347557bc /external/kafka-0-10-sql
parent4cc06f4eb1fbf1ba1fc6165783e22f93dc3b14ac (diff)
downloadspark-1a3f5f8c55d873aaf8145a8bc4867fc9902cf93d.tar.gz
spark-1a3f5f8c55d873aaf8145a8bc4867fc9902cf93d.tar.bz2
spark-1a3f5f8c55d873aaf8145a8bc4867fc9902cf93d.zip
[SPARK-19517][SS] KafkaSource fails to initialize partition offsets
## What changes were proposed in this pull request? This patch fixes a bug in `KafkaSource` with the (de)serialization of the length of the JSON string that contains the initial partition offsets. ## How was this patch tested? I ran the test suite for spark-sql-kafka-0-10. Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com> Closes #16857 from vitillo/kafka_source_fix.
Diffstat (limited to 'external/kafka-0-10-sql')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala32
-rw-r--r--external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin1
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala104
3 files changed, 130 insertions, 7 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 9c5dceca2d..92b5d91ba4 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
import java.io._
import java.nio.charset.StandardCharsets
+import org.apache.commons.io.IOUtils
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkContext
@@ -97,16 +98,31 @@ private[kafka010] class KafkaSource(
val metadataLog =
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
- val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
- out.write(bytes.length)
- out.write(bytes)
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write(VERSION)
+ writer.write(metadata.json)
+ writer.flush
}
override def deserialize(in: InputStream): KafkaSourceOffset = {
- val length = in.read()
- val bytes = new Array[Byte](length)
- in.read(bytes)
- KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8)))
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ if (content.startsWith(VERSION)) {
+ KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
+ } else {
+ val versionInFile = content.substring(0, content.indexOf("\n"))
+ throw new IllegalStateException(
+ s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
+ s"but was $versionInFile. Please upgrade your Spark.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
}
}
@@ -335,6 +351,8 @@ private[kafka010] object KafkaSource {
| source option "failOnDataLoss" to "false".
""".stripMargin
+ private val VERSION = "v1\n"
+
def getSortedExecutorList(sc: SparkContext): Array[String] = {
val bm = sc.env.blockManager
bm.master.getPeers(bm.blockManagerId).toArray
diff --git a/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
new file mode 100644
index 0000000000..ae928e7249
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
@@ -0,0 +1 @@
+2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}} \ No newline at end of file
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 211c8a5e73..4f82b133cb 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.kafka010
+import java.io._
import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
import java.util.Properties
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
@@ -141,6 +143,108 @@ class KafkaSourceSuite extends KafkaSourceTest {
private val topicId = new AtomicInteger(0)
+ testWithUninterruptibleThread(
+ "deserialization of initial offset with Spark 2.1.0") {
+ withTempDir { metadataPath =>
+ val topic = newTopic
+ testUtils.createTopic(topic, partitions = 3)
+
+ val provider = new KafkaSourceProvider
+ val parameters = Map(
+ "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+ "subscribe" -> topic
+ )
+ val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
+ "", parameters)
+ source.getOffset.get // Write initial offset
+
+ // Make sure Spark 2.1.0 will throw an exception when reading the new log
+ intercept[java.lang.IllegalArgumentException] {
+ // Simulate how Spark 2.1.0 reads the log
+ val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
+ val length = in.read()
+ val bytes = new Array[Byte](length)
+ in.read(bytes)
+ KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
+ }
+ }
+ }
+
+ testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
+ withTempDir { metadataPath =>
+ val topic = "kafka-initial-offset-2-1-0"
+ testUtils.createTopic(topic, partitions = 3)
+
+ val provider = new KafkaSourceProvider
+ val parameters = Map(
+ "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+ "subscribe" -> topic
+ )
+
+ val from = Paths.get(
+ getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
+ val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
+ Files.copy(from, to)
+
+ val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
+ "", parameters)
+ val deserializedOffset = source.getOffset.get
+ val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
+ assert(referenceOffset == deserializedOffset)
+ }
+ }
+
+ testWithUninterruptibleThread("deserialization of initial offset written by future version") {
+ withTempDir { metadataPath =>
+ val futureMetadataLog =
+ new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
+ metadataPath.getAbsolutePath) {
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
+ writer.write(s"v0\n${metadata.json}")
+ writer.flush
+ }
+ }
+
+ val topic = newTopic
+ testUtils.createTopic(topic, partitions = 3)
+ val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
+ futureMetadataLog.add(0, offset)
+
+ val provider = new KafkaSourceProvider
+ val parameters = Map(
+ "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+ "subscribe" -> topic
+ )
+ val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
+ "", parameters)
+
+ val e = intercept[java.lang.IllegalStateException] {
+ source.getOffset.get // Read initial offset
+ }
+
+ assert(e.getMessage.contains("Please upgrade your Spark"))
+ }
+ }
+
+ test("(de)serialization of initial offsets") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 64)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+
+ testStream(reader.load)(
+ makeSureGetOffsetCalled,
+ StopStream,
+ StartStream(),
+ StopStream)
+ }
+
test("maxOffsetsPerTrigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)