aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-11-22 16:49:15 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-22 16:49:15 -0800
commitd0212eb0f22473ee5482fe98dafc24e16ffcfc63 (patch)
tree38df9e31556aaa7faf22d5dd0c0bd7f0733124c5 /external
parent39a1d30636857715247c82d551b200e1c331ad69 (diff)
downloadspark-d0212eb0f22473ee5482fe98dafc24e16ffcfc63.tar.gz
spark-d0212eb0f22473ee5482fe98dafc24e16ffcfc63.tar.bz2
spark-d0212eb0f22473ee5482fe98dafc24e16ffcfc63.zip
[SPARK-18530][SS][KAFKA] Change Kafka timestamp column type to TimestampType
## What changes were proposed in this pull request? Changed Kafka timestamp column type to TimestampType. ## How was this patch tested? `test("Kafka column types")`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15969 from zsxwing/SPARK-18530.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala16
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala81
2 files changed, 93 insertions, 4 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 1d0d402b82..d9ab4bb4f8 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,9 +32,12 @@ import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.UninterruptibleThread
/**
@@ -282,7 +285,14 @@ private[kafka010] case class KafkaSource(
// Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays.
val rdd = new KafkaSourceRDD(
sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr =>
- Row(cr.key, cr.value, cr.topic, cr.partition, cr.offset, cr.timestamp, cr.timestampType.id)
+ InternalRow(
+ cr.key,
+ cr.value,
+ UTF8String.fromString(cr.topic),
+ cr.partition,
+ cr.offset,
+ DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
+ cr.timestampType.id)
}
logInfo("GetBatch generating RDD of offset range: " +
@@ -293,7 +303,7 @@ private[kafka010] case class KafkaSource(
currentPartitionOffsets = Some(untilPartitionOffsets)
}
- sqlContext.createDataFrame(rdd, schema)
+ sqlContext.internalCreateDataFrame(rdd, schema)
}
/** Stop this source and free any resources it has allocated. */
@@ -496,7 +506,7 @@ private[kafka010] object KafkaSource {
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
- StructField("timestamp", LongType),
+ StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index cd52fd93d1..f9f62581a3 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.kafka010
+import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random
@@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
@@ -551,6 +552,84 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}
+ test("Kafka column types") {
+ val now = System.currentTimeMillis()
+ val topic = newTopic()
+ testUtils.createTopic(newTopic(), partitions = 1)
+ testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("startingOffsets", s"earliest")
+ .option("subscribe", topic)
+ .load()
+
+ val query = kafka
+ .writeStream
+ .format("memory")
+ .outputMode("append")
+ .queryName("kafkaColumnTypes")
+ .start()
+ query.processAllAvailable()
+ val rows = spark.table("kafkaColumnTypes").collect()
+ assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+ val row = rows(0)
+ assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
+ assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
+ assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
+ assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
+ assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
+ // We cannot check the exact timestamp as it's the time that messages were inserted by the
+ // producer. So here we just use a low bound to make sure the internal conversion works.
+ assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
+ assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
+ query.stop()
+ }
+
+ test("KafkaSource with watermark") {
+ val now = System.currentTimeMillis()
+ val topic = newTopic()
+ testUtils.createTopic(newTopic(), partitions = 1)
+ testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("startingOffsets", s"earliest")
+ .option("subscribe", topic)
+ .load()
+
+ val windowedAggregation = kafka
+ .withWatermark("timestamp", "10 seconds")
+ .groupBy(window($"timestamp", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start") as 'window, $"count")
+
+ val query = windowedAggregation
+ .writeStream
+ .format("memory")
+ .outputMode("complete")
+ .queryName("kafkaWatermark")
+ .start()
+ query.processAllAvailable()
+ val rows = spark.table("kafkaWatermark").collect()
+ assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+ val row = rows(0)
+ // We cannot check the exact window start time as it depands on the time that messages were
+ // inserted by the producer. So here we just use a low bound to make sure the internal
+ // conversion works.
+ assert(
+ row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+ s"Unexpected results: $row")
+ assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+ query.stop()
+ }
+
private def testFromLatestOffsets(
topic: String,
addPartitions: Boolean,