aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-05 13:18:39 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-05 13:18:39 -0700
commit9ee5c257176d5c7989031d260e74e3eca530c120 (patch)
treec100eb39f8190d73400cdad6e959744168a0838e /sql
parent72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (diff)
downloadspark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.gz
spark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.bz2
spark-9ee5c257176d5c7989031d260e74e3eca530c120.zip
[SPARK-14353] Dataset Time Window `window` API for Python, and SQL
## What changes were proposed in this pull request? The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008). This PR adds the Python, and SQL, API for this function. With this PR, SQL, Java, and Scala will share the same APIs as in users can use: - `window(timeColumn, windowDuration)` - `window(timeColumn, windowDuration, slideDuration)` - `window(timeColumn, windowDuration, slideDuration, startTime)` In Python, users can access all APIs above, but in addition they can do - In Python: `window(timeColumn, windowDuration, startTime=...)` that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows. ## How was this patch tested? Unit tests + manual tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #12136 from brkyvz/python-windows.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala35
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala27
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala57
6 files changed, 155 insertions, 15 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 7af5ffbe47..1ebdf49348 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -364,7 +364,10 @@ object FunctionRegistry {
}
Try(f.newInstance(expressions : _*).asInstanceOf[Expression]) match {
case Success(e) => e
- case Failure(e) => throw new AnalysisException(e.getMessage)
+ case Failure(e) =>
+ // the exception is an invocation exception. To get a meaningful message, we need the
+ // cause.
+ throw new AnalysisException(e.getCause.getMessage)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index 8e13833486..daf3de95dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.commons.lang.StringUtils
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@@ -34,6 +35,28 @@ case class TimeWindow(
with Unevaluable
with NonSQLExpression {
+ //////////////////////////
+ // SQL Constructors
+ //////////////////////////
+
+ def this(
+ timeColumn: Expression,
+ windowDuration: Expression,
+ slideDuration: Expression,
+ startTime: Expression) = {
+ this(timeColumn, TimeWindow.parseExpression(windowDuration),
+ TimeWindow.parseExpression(windowDuration), TimeWindow.parseExpression(startTime))
+ }
+
+ def this(timeColumn: Expression, windowDuration: Expression, slideDuration: Expression) = {
+ this(timeColumn, TimeWindow.parseExpression(windowDuration),
+ TimeWindow.parseExpression(windowDuration), 0)
+ }
+
+ def this(timeColumn: Expression, windowDuration: Expression) = {
+ this(timeColumn, windowDuration, windowDuration)
+ }
+
override def child: Expression = timeColumn
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
override def dataType: DataType = new StructType()
@@ -104,6 +127,18 @@ object TimeWindow {
cal.microseconds
}
+ /**
+ * Parses the duration expression to generate the long value for the original constructor so
+ * that we can use `window` in SQL.
+ */
+ private def parseExpression(expr: Expression): Long = expr match {
+ case NonNullLiteral(s, StringType) => getIntervalInMicroSeconds(s.toString)
+ case IntegerLiteral(i) => i.toLong
+ case NonNullLiteral(l, LongType) => l.toString.toLong
+ case _ => throw new AnalysisException("The duration and time inputs to window must be " +
+ "an integer, long or string literal.")
+ }
+
def apply(
timeColumn: Expression,
windowDuration: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 6b7997e903..232ca43588 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -22,6 +22,7 @@ import java.util.UUID
import scala.collection.Map
import scala.collection.mutable.Stack
+import org.apache.commons.lang.ClassUtils
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
@@ -365,20 +366,32 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
* @param newArgs the new product arguments.
*/
def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") {
+ // Skip no-arg constructors that are just there for kryo.
val ctors = getClass.getConstructors.filter(_.getParameterTypes.size != 0)
if (ctors.isEmpty) {
sys.error(s"No valid constructor for $nodeName")
}
- val defaultCtor = ctors.maxBy(_.getParameterTypes.size)
+ val allArgs: Array[AnyRef] = if (otherCopyArgs.isEmpty) {
+ newArgs
+ } else {
+ newArgs ++ otherCopyArgs
+ }
+ val defaultCtor = ctors.find { ctor =>
+ if (ctor.getParameterTypes.length != allArgs.length) {
+ false
+ } else if (allArgs.contains(null)) {
+ // if there is a `null`, we can't figure out the class, therefore we should just fallback
+ // to older heuristic
+ false
+ } else {
+ val argsArray: Array[Class[_]] = allArgs.map(_.getClass)
+ ClassUtils.isAssignable(argsArray, ctor.getParameterTypes, true /* autoboxing */)
+ }
+ }.getOrElse(ctors.maxBy(_.getParameterTypes.length)) // fall back to older heuristic
try {
CurrentOrigin.withOrigin(origin) {
- // Skip no-arg constructors that are just there for kryo.
- if (otherCopyArgs.isEmpty) {
- defaultCtor.newInstance(newArgs: _*).asInstanceOf[BaseType]
- } else {
- defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[BaseType]
- }
+ defaultCtor.newInstance(allArgs.toArray: _*).asInstanceOf[BaseType]
}
} catch {
case e: java.lang.IllegalArgumentException =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
index 71f969aee2..b82cf8d169 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.catalyst.expressions
+import org.scalatest.PrivateMethodTester
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.types.LongType
-class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper {
+class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with PrivateMethodTester {
test("time window is unevaluable") {
intercept[UnsupportedOperationException] {
@@ -73,4 +76,36 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper {
=== seconds)
}
}
+
+ private val parseExpression = PrivateMethod[Long]('parseExpression)
+
+ test("parse sql expression for duration in microseconds - string") {
+ val dur = TimeWindow.invokePrivate(parseExpression(Literal("5 seconds")))
+ assert(dur.isInstanceOf[Long])
+ assert(dur === 5000000)
+ }
+
+ test("parse sql expression for duration in microseconds - integer") {
+ val dur = TimeWindow.invokePrivate(parseExpression(Literal(100)))
+ assert(dur.isInstanceOf[Long])
+ assert(dur === 100)
+ }
+
+ test("parse sql expression for duration in microseconds - long") {
+ val dur = TimeWindow.invokePrivate(parseExpression(Literal.create(2 << 52, LongType)))
+ assert(dur.isInstanceOf[Long])
+ assert(dur === (2 << 52))
+ }
+
+ test("parse sql expression for duration in microseconds - invalid interval") {
+ intercept[IllegalArgumentException] {
+ TimeWindow.invokePrivate(parseExpression(Literal("2 apples")))
+ }
+ }
+
+ test("parse sql expression for duration in microseconds - invalid expression") {
+ intercept[AnalysisException] {
+ TimeWindow.invokePrivate(parseExpression(Rand(123)))
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index da58ba2add..5bc0034cb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2574,8 +2574,7 @@ object functions {
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
- * The time can be as TimestampType or LongType, however when using LongType,
- * the time must be given in seconds.
+ * The time column must be of TimestampType.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers.
@@ -2629,8 +2628,7 @@ object functions {
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
- * The time can be as TimestampType or LongType, however when using LongType,
- * the time must be given in seconds.
+ * The time column must be of TimestampType.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers.
@@ -2672,8 +2670,7 @@ object functions {
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
- * The time can be as TimestampType or LongType, however when using LongType,
- * the time must be given in seconds.
+ * The time column must be of TimestampType.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index e8103a31d5..06584ec21e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -239,4 +239,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
Row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1))
)
}
+
+ private def withTempTable(f: String => Unit): Unit = {
+ val tableName = "temp"
+ Seq(
+ ("2016-03-27 19:39:34", 1),
+ ("2016-03-27 19:39:56", 2),
+ ("2016-03-27 19:39:27", 4)).toDF("time", "value").registerTempTable(tableName)
+ try {
+ f(tableName)
+ } finally {
+ sqlContext.dropTempTable(tableName)
+ }
+ }
+
+ test("time window in SQL with single string expression") {
+ withTempTable { table =>
+ checkAnswer(
+ sqlContext.sql(s"""select window(time, "10 seconds"), value from $table""")
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+ Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
+ )
+ )
+ }
+ }
+
+ test("time window in SQL with with two expressions") {
+ withTempTable { table =>
+ checkAnswer(
+ sqlContext.sql(
+ s"""select window(time, "10 seconds", 10000000), value from $table""")
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
+ Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
+ Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2)
+ )
+ )
+ }
+ }
+
+ test("time window in SQL with with three expressions") {
+ withTempTable { table =>
+ checkAnswer(
+ sqlContext.sql(
+ s"""select window(time, "10 seconds", 10000000, "5 seconds"), value from $table""")
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1),
+ Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4),
+ Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2)
+ )
+ )
+ }
+ }
}