aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-07-08 10:51:32 -0700
committerReynold Xin <rxin@databricks.com>2015-07-08 10:51:32 -0700
commit0ba98c04c726a827df8cb19b0db17c352a647960 (patch)
tree5d12dde981c3abe0fde63702e60fa2c4693f6ab0
parent74335b31072951244967f878d8b766cd1bfc2ac6 (diff)
downloadspark-0ba98c04c726a827df8cb19b0db17c352a647960.tar.gz
spark-0ba98c04c726a827df8cb19b0db17c352a647960.tar.bz2
spark-0ba98c04c726a827df8cb19b0db17c352a647960.zip
[SPARK-8753][SQL] Create an IntervalType data type
We need a new data type to represent time intervals. Because we can't determine how many days in a month, so we need 2 values for interval: a int `months`, a long `microseconds`. The interval literal syntax looks like: `interval 3 years -4 month 4 weeks 3 second` Because we use number of 100ns as value of `TimestampType`, so it may not makes sense to support nano second unit. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7226 from cloud-fan/interval and squashes the following commits: 632062d [Wenchen Fan] address comments ac348c3 [Wenchen Fan] use case class 0342d2e [Wenchen Fan] use array byte df9256c [Wenchen Fan] fix style fd6f18a [Wenchen Fan] address comments 1856af3 [Wenchen Fan] support interval type
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala86
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala37
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala25
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java47
7 files changed, 185 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
index e457542c64..d22ad6794d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
@@ -50,6 +50,11 @@ public class DataTypes {
public static final DataType TimestampType = TimestampType$.MODULE$;
/**
+ * Gets the IntervalType object.
+ */
+ public static final DataType IntervalType = IntervalType$.MODULE$;
+
+ /**
* Gets the DoubleType object.
*/
public static final DataType DoubleType = DoubleType$.MODULE$;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index e8e9b9802e..dedd8c8fa3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.Interval
/**
* A very simple SQL parser. Based loosely on:
@@ -72,6 +73,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
+ protected val INTERVAL = Keyword("INTERVAL")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
@@ -279,12 +281,12 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
throw new AnalysisException(s"invalid function approximate $udfName")
}
}
- | APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^
+ | APPROXIMATE ~> "(" ~> unsignedFloat ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^
{ case s ~ _ ~ udfName ~ _ ~ _ ~ exp =>
if (lexical.normalizeKeyword(udfName) == "count") {
ApproxCountDistinct(exp, s.toDouble)
} else {
- throw new AnalysisException(s"invalid function approximate($floatLit) $udfName")
+ throw new AnalysisException(s"invalid function approximate($s) $udfName")
}
}
| CASE ~> whenThenElse ^^ CaseWhen
@@ -309,6 +311,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
( numericLiteral
| booleanLiteral
| stringLit ^^ {case s => Literal.create(s, StringType) }
+ | intervalLiteral
| NULL ^^^ Literal.create(null, NullType)
)
@@ -318,21 +321,71 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
)
protected lazy val numericLiteral: Parser[Literal] =
- signedNumericLiteral | unsignedNumericLiteral
-
- protected lazy val sign: Parser[String] =
- "+" | "-"
-
- protected lazy val signedNumericLiteral: Parser[Literal] =
- ( sign ~ numericLit ^^ { case s ~ l => Literal(toNarrowestIntegerType(s + l)) }
- | sign ~ floatLit ^^ { case s ~ f => Literal((s + f).toDouble) }
+ ( integral ^^ { case i => Literal(toNarrowestIntegerType(i)) }
+ | sign.? ~ unsignedFloat ^^ { case s ~ f => Literal((s.getOrElse("") + f).toDouble) }
)
- protected lazy val unsignedNumericLiteral: Parser[Literal] =
- ( numericLit ^^ { n => Literal(toNarrowestIntegerType(n)) }
- | floatLit ^^ { f => Literal(f.toDouble) }
+ protected lazy val unsignedFloat: Parser[String] =
+ ( "." ~> numericLit ^^ { u => "0." + u }
+ | elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
)
+ protected lazy val sign: Parser[String] = ("+" | "-")
+
+ protected lazy val integral: Parser[String] =
+ sign.? ~ numericLit ^^ { case s ~ n => s.getOrElse("") + n }
+
+ private def intervalUnit(unitName: String) =
+ acceptIf {
+ case lexical.Identifier(str) =>
+ val normalized = lexical.normalizeKeyword(str)
+ normalized == unitName || normalized == unitName + "s"
+ case _ => false
+ } {_ => "wrong interval unit"}
+
+ protected lazy val month: Parser[Int] =
+ integral <~ intervalUnit("month") ^^ { case num => num.toInt }
+
+ protected lazy val year: Parser[Int] =
+ integral <~ intervalUnit("year") ^^ { case num => num.toInt * 12 }
+
+ protected lazy val microsecond: Parser[Long] =
+ integral <~ intervalUnit("microsecond") ^^ { case num => num.toLong }
+
+ protected lazy val millisecond: Parser[Long] =
+ integral <~ intervalUnit("millisecond") ^^ { case num => num.toLong * 1000 }
+
+ protected lazy val second: Parser[Long] =
+ integral <~ intervalUnit("second") ^^ { case num => num.toLong * 1000 * 1000 }
+
+ protected lazy val minute: Parser[Long] =
+ integral <~ intervalUnit("minute") ^^ { case num => num.toLong * 1000 * 1000 * 60 }
+
+ protected lazy val hour: Parser[Long] =
+ integral <~ intervalUnit("hour") ^^ { case num => num.toLong * 1000 * 1000 * 3600 }
+
+ protected lazy val day: Parser[Long] =
+ integral <~ intervalUnit("day") ^^ { case num => num.toLong * 1000 * 1000 * 3600 * 24 }
+
+ protected lazy val week: Parser[Long] =
+ integral <~ intervalUnit("week") ^^ { case num => num.toLong * 1000 * 1000 * 3600 * 24 * 7 }
+
+ protected lazy val intervalLiteral: Parser[Literal] =
+ INTERVAL ~> year.? ~ month.? ~ week.? ~ day.? ~ hour.? ~ minute.? ~ second.? ~
+ millisecond.? ~ microsecond.? ^^ {
+ case year ~ month ~ week ~ day ~ hour ~ minute ~ second ~
+ millisecond ~ microsecond =>
+ if (!Seq(year, month, week, day, hour, minute, second,
+ millisecond, microsecond).exists(_.isDefined)) {
+ throw new AnalysisException(
+ "at least one time unit should be given for interval literal")
+ }
+ val months = Seq(year, month).map(_.getOrElse(0)).sum
+ val microseconds = Seq(week, day, hour, minute, second, millisecond, microsecond)
+ .map(_.getOrElse(0L)).sum
+ Literal.create(new Interval(months, microseconds), IntervalType)
+ }
+
private def toNarrowestIntegerType(value: String): Any = {
val bigIntValue = BigDecimal(value)
@@ -343,11 +396,6 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
}
}
- protected lazy val floatLit: Parser[String] =
- ( "." ~> unsignedNumericLiteral ^^ { u => "0." + u }
- | elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
- )
-
protected lazy val baseExpression: Parser[Expression] =
( "*" ^^^ UnresolvedStar(None)
| ident <~ "." ~ "*" ^^ { case tableName => UnresolvedStar(Option(tableName)) }
@@ -355,7 +403,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
)
protected lazy val signedPrimary: Parser[Expression] =
- sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e}
+ sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e }
protected lazy val attributeName: Parser[String] = acceptMatch("attribute name", {
case lexical.Identifier(str) => str
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala
new file mode 100644
index 0000000000..87c6e9e6e5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing time intervals.
+ *
+ * Please use the singleton [[DataTypes.IntervalType]].
+ */
+@DeveloperApi
+class IntervalType private() extends DataType {
+
+ override def defaultSize: Int = 4096
+
+ private[spark] override def asNullable: IntervalType = this
+}
+
+case object IntervalType extends IntervalType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
index de4b511edc..2be9b2d76c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
@@ -41,7 +41,7 @@ class TimestampType private() extends AtomicType {
private[sql] val ordering = implicitly[Ordering[InternalType]]
/**
- * The default size of a value of the TimestampType is 12 bytes.
+ * The default size of a value of the TimestampType is 8 bytes.
*/
override def defaultSize: Int = 8
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 1f0b93e285..d7440c55bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -304,6 +304,9 @@ private[sql] object ResolvedDataSource {
mode: SaveMode,
options: Map[String, String],
data: DataFrame): ResolvedDataSource = {
+ if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) {
+ throw new AnalysisException("Cannot save interval data type into external storage.")
+ }
val clazz: Class[_] = lookupDataSource(provider)
val relation = clazz.newInstance() match {
case dataSource: CreatableRelationProvider =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 12ad019e8b..231440892b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1467,4 +1467,29 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
checkAnswer(sql("select count, sort from t"), Row(1, "a"))
}
}
+
+ test("SPARK-8753: add interval type") {
+ import org.apache.spark.unsafe.types.Interval
+
+ val df = sql("select interval 3 years -3 month 7 week 123 microseconds")
+ checkAnswer(df, Row(new Interval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 * 7 + 123 )))
+ withTempPath(f => {
+ // Currently we don't yet support saving out values of interval data type.
+ val e = intercept[AnalysisException] {
+ df.write.json(f.getCanonicalPath)
+ }
+ e.message.contains("Cannot save interval data type into external storage")
+ })
+
+ def checkIntervalParseError(s: String): Unit = {
+ val e = intercept[AnalysisException] {
+ sql(s)
+ }
+ e.message.contains("at least one time unit should be given for interval literal")
+ }
+
+ checkIntervalParseError("select interval")
+ // Currently we don't yet support nanosecond
+ checkIntervalParseError("select interval 23 nanosecond")
+ }
}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java
new file mode 100644
index 0000000000..3eb67ede06
--- /dev/null
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.types;
+
+import java.io.Serializable;
+
+/**
+ * The internal representation of interval type.
+ */
+public final class Interval implements Serializable {
+ public final int months;
+ public final long microseconds;
+
+ public Interval(int months, long microseconds) {
+ this.months = months;
+ this.microseconds = microseconds;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (other == null || !(other instanceof Interval)) return false;
+
+ Interval o = (Interval) other;
+ return this.months == o.months && this.microseconds == o.microseconds;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * months + (int) microseconds;
+ }
+}