aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-01 13:19:24 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-01 13:19:24 -0700
commit1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c (patch)
tree3dd5b6bfd14b9eafde58bed77dc89ae43712a599 /sql/catalyst
parent1e886159849e3918445d3fdc3c4cef86c6c1a236 (diff)
downloadspark-1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c.tar.gz
spark-1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c.tar.bz2
spark-1b829ce13990b40fd8d7c9efcc2ae55c4dbc861c.zip
[SPARK-14160] Time Windowing functions for Datasets
## What changes were proposed in this pull request? This PR adds the function `window` as a column expression. `window` can be used to bucket rows into time windows given a time column. With this expression, performing time series analysis on batch data, as well as streaming data should become much more simpler. ### Usage Assume the following schema: `sensor_id, measurement, timestamp` To average 5 minute data every 1 minute (window length of 5 minutes, slide duration of 1 minute), we will use: ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:00-09:05:00 09:01:00-09:06:00 09:02:00-09:07:00 ... ``` Intervals will start at every `slideDuration` starting at the unix epoch (1970-01-01 00:00:00 UTC). To start intervals at a different point of time, e.g. 30 seconds after a minute, the `startTime` parameter can be used. ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”, "30 second"), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:30-09:05:30 09:01:30-09:06:30 09:02:30-09:07:30 ... ``` Support for Python will be made in a follow up PR after this. ## How was this patch tested? This patch has some basic unit tests for the `TimeWindow` expression testing that the parameters pass validation, and it also has some unit/integration tests testing the correctness of the windowing and usability in complex operations (multi-column grouping, multi-column projections, joins). Author: Burak Yavuz <brkyvz@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #12008 from brkyvz/df-time-window.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala90
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala133
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala56
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala76
5 files changed, 356 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8dc0532b3f..d82ee3a205 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -102,6 +102,7 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
+ TimeWindowing ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
@@ -1591,3 +1592,92 @@ object ResolveUpCast extends Rule[LogicalPlan] {
}
}
}
+
+/**
+ * Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to
+ * figure out how many windows a time column can map to, we over-estimate the number of windows and
+ * filter out the rows where the time column is not inside the time window.
+ */
+object TimeWindowing extends Rule[LogicalPlan] {
+ import org.apache.spark.sql.catalyst.dsl.expressions._
+
+ private final val WINDOW_START = "start"
+ private final val WINDOW_END = "end"
+
+ /**
+ * Generates the logical plan for generating window ranges on a timestamp column. Without
+ * knowing what the timestamp value is, it's non-trivial to figure out deterministically how many
+ * window ranges a timestamp will map to given all possible combinations of a window duration,
+ * slide duration and start time (offset). Therefore, we express and over-estimate the number of
+ * windows there may be, and filter the valid windows. We use last Project operator to group
+ * the window columns into a struct so they can be accessed as `window.start` and `window.end`.
+ *
+ * The windows are calculated as below:
+ * maxNumOverlapping <- ceil(windowDuration / slideDuration)
+ * for (i <- 0 until maxNumOverlapping)
+ * windowId <- ceil((timestamp - startTime) / slideDuration)
+ * windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
+ * windowEnd <- windowStart + windowDuration
+ * return windowStart, windowEnd
+ *
+ * This behaves as follows for the given parameters for the time: 12:05. The valid windows are
+ * marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the
+ * Filter operator.
+ * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
+ * 11:55 - 12:07 + 11:52 - 12:04 x
+ * 12:00 - 12:12 + 11:57 - 12:09 +
+ * 12:05 - 12:17 + 12:02 - 12:14 +
+ *
+ * @param plan The logical plan
+ * @return the logical plan that will generate the time windows using the Expand operator, with
+ * the Filter operator for correctness and Project for usability.
+ */
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case p: LogicalPlan if p.children.size == 1 =>
+ val child = p.children.head
+ val windowExpressions =
+ p.expressions.flatMap(_.collect { case t: TimeWindow => t }).distinct.toList // Not correct.
+
+ // Only support a single window expression for now
+ if (windowExpressions.size == 1 &&
+ windowExpressions.head.timeColumn.resolved &&
+ windowExpressions.head.checkInputDataTypes().isSuccess) {
+ val window = windowExpressions.head
+ val windowAttr = AttributeReference("window", window.dataType)()
+
+ val maxNumOverlapping = math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+ val windows = Seq.tabulate(maxNumOverlapping + 1) { i =>
+ val windowId = Ceil((PreciseTimestamp(window.timeColumn) - window.startTime) /
+ window.slideDuration)
+ val windowStart = (windowId + i - maxNumOverlapping) *
+ window.slideDuration + window.startTime
+ val windowEnd = windowStart + window.windowDuration
+
+ CreateNamedStruct(
+ Literal(WINDOW_START) :: windowStart ::
+ Literal(WINDOW_END) :: windowEnd :: Nil)
+ }
+
+ val projections = windows.map(_ +: p.children.head.output)
+
+ val filterExpr =
+ window.timeColumn >= windowAttr.getField(WINDOW_START) &&
+ window.timeColumn < windowAttr.getField(WINDOW_END)
+
+ val expandedPlan =
+ Filter(filterExpr,
+ Expand(projections, windowAttr +: child.output, child))
+
+ val substitutedPlan = p transformExpressions {
+ case t: TimeWindow => windowAttr
+ }
+
+ substitutedPlan.withNewChildren(expandedPlan :: Nil)
+ } else if (windowExpressions.size > 1) {
+ p.failAnalysis("Multiple time window expressions would result in a cartesian product " +
+ "of rows, therefore they are not currently not supported.")
+ } else {
+ p // Return unchanged. Analyzer will throw exception later
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e9788b7e4d..ca8db3cbc5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -297,6 +297,7 @@ object FunctionRegistry {
expression[UnixTimestamp]("unix_timestamp"),
expression[WeekOfYear]("weekofyear"),
expression[Year]("year"),
+ expression[TimeWindow]("window"),
// collection functions
expression[ArrayContains]("array_contains"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
new file mode 100644
index 0000000000..8e13833486
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.commons.lang.StringUtils
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+case class TimeWindow(
+ timeColumn: Expression,
+ windowDuration: Long,
+ slideDuration: Long,
+ startTime: Long) extends UnaryExpression
+ with ImplicitCastInputTypes
+ with Unevaluable
+ with NonSQLExpression {
+
+ override def child: Expression = timeColumn
+ override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
+ override def dataType: DataType = new StructType()
+ .add(StructField("start", TimestampType))
+ .add(StructField("end", TimestampType))
+
+ // This expression is replaced in the analyzer.
+ override lazy val resolved = false
+
+ /**
+ * Validate the inputs for the window duration, slide duration, and start time in addition to
+ * the input data type.
+ */
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val dataTypeCheck = super.checkInputDataTypes()
+ if (dataTypeCheck.isSuccess) {
+ if (windowDuration <= 0) {
+ return TypeCheckFailure(s"The window duration ($windowDuration) must be greater than 0.")
+ }
+ if (slideDuration <= 0) {
+ return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.")
+ }
+ if (startTime < 0) {
+ return TypeCheckFailure(s"The start time ($startTime) must be greater than or equal to 0.")
+ }
+ if (slideDuration > windowDuration) {
+ return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" +
+ s" to the windowDuration ($windowDuration).")
+ }
+ if (startTime >= slideDuration) {
+ return TypeCheckFailure(s"The start time ($startTime) must be less than the " +
+ s"slideDuration ($slideDuration).")
+ }
+ }
+ dataTypeCheck
+ }
+}
+
+object TimeWindow {
+ /**
+ * Parses the interval string for a valid time duration. CalendarInterval expects interval
+ * strings to start with the string `interval`. For usability, we prepend `interval` to the string
+ * if the user omitted it.
+ *
+ * @param interval The interval string
+ * @return The interval duration in microseconds. SparkSQL casts TimestampType has microsecond
+ * precision.
+ */
+ private def getIntervalInMicroSeconds(interval: String): Long = {
+ if (StringUtils.isBlank(interval)) {
+ throw new IllegalArgumentException(
+ "The window duration, slide duration and start time cannot be null or blank.")
+ }
+ val intervalString = if (interval.startsWith("interval")) {
+ interval
+ } else {
+ "interval " + interval
+ }
+ val cal = CalendarInterval.fromString(intervalString)
+ if (cal == null) {
+ throw new IllegalArgumentException(
+ s"The provided interval ($interval) did not correspond to a valid interval string.")
+ }
+ if (cal.months > 0) {
+ throw new IllegalArgumentException(
+ s"Intervals greater than a month is not supported ($interval).")
+ }
+ cal.microseconds
+ }
+
+ def apply(
+ timeColumn: Expression,
+ windowDuration: String,
+ slideDuration: String,
+ startTime: String): TimeWindow = {
+ TimeWindow(timeColumn,
+ getIntervalInMicroSeconds(windowDuration),
+ getIntervalInMicroSeconds(slideDuration),
+ getIntervalInMicroSeconds(startTime))
+ }
+}
+
+/**
+ * Expression used internally to convert the TimestampType to Long without losing
+ * precision, i.e. in microseconds. Used in time windowing.
+ */
+case class PreciseTimestamp(child: Expression) extends UnaryExpression with ExpectsInputTypes {
+ override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
+ override def dataType: DataType = LongType
+ override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
+ val eval = child.gen(ctx)
+ eval.code +
+ s"""boolean ${ev.isNull} = ${eval.isNull};
+ |${ctx.javaType(dataType)} ${ev.value} = ${eval.value};
+ """.stripMargin
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a90dfc5039..ad101d1c40 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -272,6 +272,62 @@ class AnalysisErrorSuite extends AnalysisTest {
testRelation2.where('bad_column > 1).groupBy('a)(UnresolvedAlias(max('b))),
"cannot resolve '`bad_column`'" :: Nil)
+ errorTest(
+ "slide duration greater than window in time window",
+ testRelation2.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "2 second", "0 second").as("window")),
+ s"The slide duration " :: " must be less than or equal to the windowDuration " :: Nil
+ )
+
+ errorTest(
+ "start time greater than slide duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 minute").as("window")),
+ "The start time " :: " must be less than the slideDuration " :: Nil
+ )
+
+ errorTest(
+ "start time equal to slide duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 second").as("window")),
+ "The start time " :: " must be less than the slideDuration " :: Nil
+ )
+
+ errorTest(
+ "negative window duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "-1 second", "1 second", "0 second").as("window")),
+ "The window duration " :: " must be greater than 0." :: Nil
+ )
+
+ errorTest(
+ "zero window duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "0 second", "1 second", "0 second").as("window")),
+ "The window duration " :: " must be greater than 0." :: Nil
+ )
+
+ errorTest(
+ "negative slide duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "-1 second", "0 second").as("window")),
+ "The slide duration " :: " must be greater than 0." :: Nil
+ )
+
+ errorTest(
+ "zero slide duration in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "0 second", "0 second").as("window")),
+ "The slide duration" :: " must be greater than 0." :: Nil
+ )
+
+ errorTest(
+ "negative start time in time window",
+ testRelation.select(
+ TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-5 second").as("window")),
+ "The start time" :: "must be greater than or equal to 0." :: Nil
+ )
+
test("SPARK-6452 regression test") {
// CheckAnalysis should throw AnalysisException when Aggregate contains missing attribute(s)
// Since we manually construct the logical plan at here and Sum only accept
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
new file mode 100644
index 0000000000..71f969aee2
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+
+class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper {
+
+ test("time window is unevaluable") {
+ intercept[UnsupportedOperationException] {
+ evaluate(TimeWindow(Literal(10L), "1 second", "1 second", "0 second"))
+ }
+ }
+
+ private def checkErrorMessage(msg: String, value: String): Unit = {
+ val validDuration = "10 second"
+ val validTime = "5 second"
+ val e1 = intercept[IllegalArgumentException] {
+ TimeWindow(Literal(10L), value, validDuration, validTime).windowDuration
+ }
+ val e2 = intercept[IllegalArgumentException] {
+ TimeWindow(Literal(10L), validDuration, value, validTime).slideDuration
+ }
+ val e3 = intercept[IllegalArgumentException] {
+ TimeWindow(Literal(10L), validDuration, validDuration, value).startTime
+ }
+ Seq(e1, e2, e3).foreach { e =>
+ e.getMessage.contains(msg)
+ }
+ }
+
+ test("blank intervals throw exception") {
+ for (blank <- Seq(null, " ", "\n", "\t")) {
+ checkErrorMessage(
+ "The window duration, slide duration and start time cannot be null or blank.", blank)
+ }
+ }
+
+ test("invalid intervals throw exception") {
+ checkErrorMessage(
+ "did not correspond to a valid interval string.", "2 apples")
+ }
+
+ test("intervals greater than a month throws exception") {
+ checkErrorMessage(
+ "Intervals greater than or equal to a month is not supported (1 month).", "1 month")
+ }
+
+ test("interval strings work with and without 'interval' prefix and return microseconds") {
+ val validDuration = "10 second"
+ for ((text, seconds) <- Seq(
+ ("1 second", 1000000), // 1e6
+ ("1 minute", 60000000), // 6e7
+ ("2 hours", 7200000000L))) { // 72e9
+ assert(TimeWindow(Literal(10L), text, validDuration, "0 seconds").windowDuration === seconds)
+ assert(TimeWindow(Literal(10L), "interval " + text, validDuration, "0 seconds").windowDuration
+ === seconds)
+ }
+ }
+}