aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-19 10:48:04 -0800
committerReynold Xin <rxin@databricks.com>2015-11-19 10:48:04 -0800
commitf449992009becc8f7c7f06cda522b9beaa1e263c (patch)
tree324a6d2cf7515810be9eace1b1ced787f0fc372a
parent1a93323c5bab18ed7e55bf6f7b13aae88cb9721c (diff)
downloadspark-f449992009becc8f7c7f06cda522b9beaa1e263c.tar.gz
spark-f449992009becc8f7c7f06cda522b9beaa1e263c.tar.bz2
spark-f449992009becc8f7c7f06cda522b9beaa1e263c.zip
[SPARK-11849][SQL] Analyzer should replace current_date and current_timestamp with literals
We currently rely on the optimizer's constant folding to replace current_timestamp and current_date. However, this can still result in different values for different instances of current_timestamp/current_date if the optimizer is not running fast enough. A better solution is to replace these functions in the analyzer in one shot. Author: Reynold Xin <rxin@databricks.com> Closes #9833 from rxin/SPARK-11849.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala27
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala38
2 files changed, 60 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f00c451b59..84781cd57f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -65,9 +65,8 @@ class Analyzer(
lazy val batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
- CTESubstitution ::
- WindowsSubstitution ::
- Nil : _*),
+ CTESubstitution,
+ WindowsSubstitution),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
@@ -84,7 +83,8 @@ class Analyzer(
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
- PullOutNondeterministic),
+ PullOutNondeterministic,
+ ComputeCurrentTime),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("Cleanup", fixedPoint,
@@ -1076,7 +1076,7 @@ class Analyzer(
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if !p.resolved => p // Skip unresolved nodes.
- case plan => plan transformExpressionsUp {
+ case p => p transformExpressionsUp {
case udf @ ScalaUDF(func, _, inputs, _) =>
val parameterTypes = ScalaReflection.getParameterTypes(func)
@@ -1162,3 +1162,20 @@ object CleanupAliases extends Rule[LogicalPlan] {
}
}
}
+
+/**
+ * Computes the current date and time to make sure we return the same result in a single query.
+ */
+object ComputeCurrentTime extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ val dateExpr = CurrentDate()
+ val timeExpr = CurrentTimestamp()
+ val currentDate = Literal.create(dateExpr.eval(EmptyRow), dateExpr.dataType)
+ val currentTime = Literal.create(timeExpr.eval(EmptyRow), timeExpr.dataType)
+
+ plan transformAllExpressions {
+ case CurrentDate() => currentDate
+ case CurrentTimestamp() => currentTime
+ }
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 08586a9741..e051069951 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
class AnalysisSuite extends AnalysisTest {
@@ -218,4 +219,41 @@ class AnalysisSuite extends AnalysisTest {
udf4)
// checkUDF(udf4, expected4)
}
+
+ test("analyzer should replace current_timestamp with literals") {
+ val in = Project(Seq(Alias(CurrentTimestamp(), "a")(), Alias(CurrentTimestamp(), "b")()),
+ LocalRelation())
+
+ val min = System.currentTimeMillis() * 1000
+ val plan = in.analyze.asInstanceOf[Project]
+ val max = (System.currentTimeMillis() + 1) * 1000
+
+ val lits = new scala.collection.mutable.ArrayBuffer[Long]
+ plan.transformAllExpressions { case e: Literal =>
+ lits += e.value.asInstanceOf[Long]
+ e
+ }
+ assert(lits.size == 2)
+ assert(lits(0) >= min && lits(0) <= max)
+ assert(lits(1) >= min && lits(1) <= max)
+ assert(lits(0) == lits(1))
+ }
+
+ test("analyzer should replace current_date with literals") {
+ val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), "b")()), LocalRelation())
+
+ val min = DateTimeUtils.millisToDays(System.currentTimeMillis())
+ val plan = in.analyze.asInstanceOf[Project]
+ val max = DateTimeUtils.millisToDays(System.currentTimeMillis())
+
+ val lits = new scala.collection.mutable.ArrayBuffer[Int]
+ plan.transformAllExpressions { case e: Literal =>
+ lits += e.value.asInstanceOf[Int]
+ e
+ }
+ assert(lits.size == 2)
+ assert(lits(0) >= min && lits(0) <= max)
+ assert(lits(1) >= min && lits(1) <= max)
+ assert(lits(0) == lits(1))
+ }
}