diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-04-05 13:18:39 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-05 13:18:39 -0700 |
commit | 9ee5c257176d5c7989031d260e74e3eca530c120 (patch) | |
tree | c100eb39f8190d73400cdad6e959744168a0838e /sql/catalyst | |
parent | 72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (diff) | |
download | spark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.gz spark-9ee5c257176d5c7989031d260e74e3eca530c120.tar.bz2 spark-9ee5c257176d5c7989031d260e74e3eca530c120.zip |
[SPARK-14353] Dataset Time Window `window` API for Python, and SQL
## What changes were proposed in this pull request?
The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008).
This PR adds the Python, and SQL, API for this function.
With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
- `window(timeColumn, windowDuration)`
- `window(timeColumn, windowDuration, slideDuration)`
- `window(timeColumn, windowDuration, slideDuration, startTime)`
In Python, users can access all APIs above, but in addition they can do
- In Python:
`window(timeColumn, windowDuration, startTime=...)`
that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.
## How was this patch tested?
Unit tests + manual tests
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #12136 from brkyvz/python-windows.
Diffstat (limited to 'sql/catalyst')
4 files changed, 95 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 7af5ffbe47..1ebdf49348 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -364,7 +364,10 @@ object FunctionRegistry { } Try(f.newInstance(expressions : _*).asInstanceOf[Expression]) match { case Success(e) => e - case Failure(e) => throw new AnalysisException(e.getMessage) + case Failure(e) => + // the exception is an invocation exception. To get a meaningful message, we need the + // cause. + throw new AnalysisException(e.getCause.getMessage) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 8e13833486..daf3de95dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.commons.lang.StringUtils +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -34,6 +35,28 @@ case class TimeWindow( with Unevaluable with NonSQLExpression { + ////////////////////////// + // SQL Constructors + ////////////////////////// + + def this( + timeColumn: Expression, + windowDuration: Expression, + slideDuration: Expression, + startTime: Expression) = { + this(timeColumn, TimeWindow.parseExpression(windowDuration), + TimeWindow.parseExpression(windowDuration), TimeWindow.parseExpression(startTime)) + } + + def this(timeColumn: Expression, windowDuration: Expression, slideDuration: Expression) = { + this(timeColumn, TimeWindow.parseExpression(windowDuration), + TimeWindow.parseExpression(windowDuration), 0) + } + + def this(timeColumn: Expression, windowDuration: Expression) = { + this(timeColumn, windowDuration, windowDuration) + } + override def child: Expression = timeColumn override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType) override def dataType: DataType = new StructType() @@ -104,6 +127,18 @@ object TimeWindow { cal.microseconds } + /** + * Parses the duration expression to generate the long value for the original constructor so + * that we can use `window` in SQL. + */ + private def parseExpression(expr: Expression): Long = expr match { + case NonNullLiteral(s, StringType) => getIntervalInMicroSeconds(s.toString) + case IntegerLiteral(i) => i.toLong + case NonNullLiteral(l, LongType) => l.toString.toLong + case _ => throw new AnalysisException("The duration and time inputs to window must be " + + "an integer, long or string literal.") + } + def apply( timeColumn: Expression, windowDuration: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 6b7997e903..232ca43588 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -22,6 +22,7 @@ import java.util.UUID import scala.collection.Map import scala.collection.mutable.Stack +import org.apache.commons.lang.ClassUtils import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -365,20 +366,32 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { * @param newArgs the new product arguments. */ def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") { + // Skip no-arg constructors that are just there for kryo. val ctors = getClass.getConstructors.filter(_.getParameterTypes.size != 0) if (ctors.isEmpty) { sys.error(s"No valid constructor for $nodeName") } - val defaultCtor = ctors.maxBy(_.getParameterTypes.size) + val allArgs: Array[AnyRef] = if (otherCopyArgs.isEmpty) { + newArgs + } else { + newArgs ++ otherCopyArgs + } + val defaultCtor = ctors.find { ctor => + if (ctor.getParameterTypes.length != allArgs.length) { + false + } else if (allArgs.contains(null)) { + // if there is a `null`, we can't figure out the class, therefore we should just fallback + // to older heuristic + false + } else { + val argsArray: Array[Class[_]] = allArgs.map(_.getClass) + ClassUtils.isAssignable(argsArray, ctor.getParameterTypes, true /* autoboxing */) + } + }.getOrElse(ctors.maxBy(_.getParameterTypes.length)) // fall back to older heuristic try { CurrentOrigin.withOrigin(origin) { - // Skip no-arg constructors that are just there for kryo. - if (otherCopyArgs.isEmpty) { - defaultCtor.newInstance(newArgs: _*).asInstanceOf[BaseType] - } else { - defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[BaseType] - } + defaultCtor.newInstance(allArgs.toArray: _*).asInstanceOf[BaseType] } } catch { case e: java.lang.IllegalArgumentException => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala index 71f969aee2..b82cf8d169 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.catalyst.expressions +import org.scalatest.PrivateMethodTester + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.types.LongType -class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper { +class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with PrivateMethodTester { test("time window is unevaluable") { intercept[UnsupportedOperationException] { @@ -73,4 +76,36 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper { === seconds) } } + + private val parseExpression = PrivateMethod[Long]('parseExpression) + + test("parse sql expression for duration in microseconds - string") { + val dur = TimeWindow.invokePrivate(parseExpression(Literal("5 seconds"))) + assert(dur.isInstanceOf[Long]) + assert(dur === 5000000) + } + + test("parse sql expression for duration in microseconds - integer") { + val dur = TimeWindow.invokePrivate(parseExpression(Literal(100))) + assert(dur.isInstanceOf[Long]) + assert(dur === 100) + } + + test("parse sql expression for duration in microseconds - long") { + val dur = TimeWindow.invokePrivate(parseExpression(Literal.create(2 << 52, LongType))) + assert(dur.isInstanceOf[Long]) + assert(dur === (2 << 52)) + } + + test("parse sql expression for duration in microseconds - invalid interval") { + intercept[IllegalArgumentException] { + TimeWindow.invokePrivate(parseExpression(Literal("2 apples"))) + } + } + + test("parse sql expression for duration in microseconds - invalid expression") { + intercept[AnalysisException] { + TimeWindow.invokePrivate(parseExpression(Rand(123))) + } + } } |