From d0212eb0f22473ee5482fe98dafc24e16ffcfc63 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 22 Nov 2016 16:49:15 -0800 Subject: [SPARK-18530][SS][KAFKA] Change Kafka timestamp column type to TimestampType ## What changes were proposed in this pull request? Changed Kafka timestamp column type to TimestampType. ## How was this patch tested? `test("Kafka column types")`. Author: Shixiong Zhu Closes #15969 from zsxwing/SPARK-18530. --- .../apache/spark/sql/kafka010/KafkaSource.scala | 16 ++++- .../spark/sql/kafka010/KafkaSourceSuite.scala | 81 +++++++++++++++++++++- 2 files changed, 93 insertions(+), 4 deletions(-) (limited to 'external') diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 1d0d402b82..d9ab4bb4f8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -32,9 +32,12 @@ import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.kafka010.KafkaSource._ import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.UninterruptibleThread /** @@ -282,7 +285,14 @@ private[kafka010] case class KafkaSource( // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. val rdd = new KafkaSourceRDD( sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr => - Row(cr.key, cr.value, cr.topic, cr.partition, cr.offset, cr.timestamp, cr.timestampType.id) + InternalRow( + cr.key, + cr.value, + UTF8String.fromString(cr.topic), + cr.partition, + cr.offset, + DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), + cr.timestampType.id) } logInfo("GetBatch generating RDD of offset range: " + @@ -293,7 +303,7 @@ private[kafka010] case class KafkaSource( currentPartitionOffsets = Some(untilPartitionOffsets) } - sqlContext.createDataFrame(rdd, schema) + sqlContext.internalCreateDataFrame(rdd, schema) } /** Stop this source and free any resources it has allocated. */ @@ -496,7 +506,7 @@ private[kafka010] object KafkaSource { StructField("topic", StringType), StructField("partition", IntegerType), StructField("offset", LongType), - StructField("timestamp", LongType), + StructField("timestamp", TimestampType), StructField("timestampType", IntegerType) )) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index cd52fd93d1..f9f62581a3 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.kafka010 +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Properties import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random @@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.test.SharedSQLContext @@ -551,6 +552,84 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } + test("Kafka column types") { + val now = System.currentTimeMillis() + val topic = newTopic() + testUtils.createTopic(newTopic(), partitions = 1) + testUtils.sendMessages(topic, Array(1).map(_.toString)) + + val kafka = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("startingOffsets", s"earliest") + .option("subscribe", topic) + .load() + + val query = kafka + .writeStream + .format("memory") + .outputMode("append") + .queryName("kafkaColumnTypes") + .start() + query.processAllAvailable() + val rows = spark.table("kafkaColumnTypes").collect() + assert(rows.length === 1, s"Unexpected results: ${rows.toList}") + val row = rows(0) + assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row") + assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row") + assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row") + assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row") + assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row") + // We cannot check the exact timestamp as it's the time that messages were inserted by the + // producer. So here we just use a low bound to make sure the internal conversion works. + assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row") + assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row") + query.stop() + } + + test("KafkaSource with watermark") { + val now = System.currentTimeMillis() + val topic = newTopic() + testUtils.createTopic(newTopic(), partitions = 1) + testUtils.sendMessages(topic, Array(1).map(_.toString)) + + val kafka = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("startingOffsets", s"earliest") + .option("subscribe", topic) + .load() + + val windowedAggregation = kafka + .withWatermark("timestamp", "10 seconds") + .groupBy(window($"timestamp", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start") as 'window, $"count") + + val query = windowedAggregation + .writeStream + .format("memory") + .outputMode("complete") + .queryName("kafkaWatermark") + .start() + query.processAllAvailable() + val rows = spark.table("kafkaWatermark").collect() + assert(rows.length === 1, s"Unexpected results: ${rows.toList}") + val row = rows(0) + // We cannot check the exact window start time as it depands on the time that messages were + // inserted by the producer. So here we just use a low bound to make sure the internal + // conversion works. + assert( + row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000, + s"Unexpected results: $row") + assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row") + query.stop() + } + private def testFromLatestOffsets( topic: String, addPartitions: Boolean, -- cgit v1.2.3