From 0ba98c04c726a827df8cb19b0db17c352a647960 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 8 Jul 2015 10:51:32 -0700 Subject: [SPARK-8753][SQL] Create an IntervalType data type We need a new data type to represent time intervals. Because we can't determine how many days in a month, so we need 2 values for interval: a int `months`, a long `microseconds`. The interval literal syntax looks like: `interval 3 years -4 month 4 weeks 3 second` Because we use number of 100ns as value of `TimestampType`, so it may not makes sense to support nano second unit. Author: Wenchen Fan Closes #7226 from cloud-fan/interval and squashes the following commits: 632062d [Wenchen Fan] address comments ac348c3 [Wenchen Fan] use case class 0342d2e [Wenchen Fan] use array byte df9256c [Wenchen Fan] fix style fd6f18a [Wenchen Fan] address comments 1856af3 [Wenchen Fan] support interval type --- .../java/org/apache/spark/sql/types/DataTypes.java | 5 ++ .../org/apache/spark/sql/catalyst/SqlParser.scala | 86 +++++++++++++++++----- .../org/apache/spark/sql/types/IntervalType.scala | 37 ++++++++++ .../org/apache/spark/sql/types/TimestampType.scala | 2 +- .../scala/org/apache/spark/sql/sources/ddl.scala | 3 + .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 25 +++++++ .../org/apache/spark/unsafe/types/Interval.java | 47 ++++++++++++ 7 files changed, 185 insertions(+), 20 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala create mode 100644 unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java index e457542c64..d22ad6794d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java @@ -49,6 +49,11 @@ public class DataTypes { */ public static final DataType TimestampType = TimestampType$.MODULE$; + /** + * Gets the IntervalType object. + */ + public static final DataType IntervalType = IntervalType$.MODULE$; + /** * Gets the DoubleType object. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index e8e9b9802e..dedd8c8fa3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.Interval /** * A very simple SQL parser. Based loosely on: @@ -72,6 +73,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { protected val INNER = Keyword("INNER") protected val INSERT = Keyword("INSERT") protected val INTERSECT = Keyword("INTERSECT") + protected val INTERVAL = Keyword("INTERVAL") protected val INTO = Keyword("INTO") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") @@ -279,12 +281,12 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { throw new AnalysisException(s"invalid function approximate $udfName") } } - | APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^ + | APPROXIMATE ~> "(" ~> unsignedFloat ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^ { case s ~ _ ~ udfName ~ _ ~ _ ~ exp => if (lexical.normalizeKeyword(udfName) == "count") { ApproxCountDistinct(exp, s.toDouble) } else { - throw new AnalysisException(s"invalid function approximate($floatLit) $udfName") + throw new AnalysisException(s"invalid function approximate($s) $udfName") } } | CASE ~> whenThenElse ^^ CaseWhen @@ -309,6 +311,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { ( numericLiteral | booleanLiteral | stringLit ^^ {case s => Literal.create(s, StringType) } + | intervalLiteral | NULL ^^^ Literal.create(null, NullType) ) @@ -318,21 +321,71 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { ) protected lazy val numericLiteral: Parser[Literal] = - signedNumericLiteral | unsignedNumericLiteral - - protected lazy val sign: Parser[String] = - "+" | "-" - - protected lazy val signedNumericLiteral: Parser[Literal] = - ( sign ~ numericLit ^^ { case s ~ l => Literal(toNarrowestIntegerType(s + l)) } - | sign ~ floatLit ^^ { case s ~ f => Literal((s + f).toDouble) } + ( integral ^^ { case i => Literal(toNarrowestIntegerType(i)) } + | sign.? ~ unsignedFloat ^^ { case s ~ f => Literal((s.getOrElse("") + f).toDouble) } ) - protected lazy val unsignedNumericLiteral: Parser[Literal] = - ( numericLit ^^ { n => Literal(toNarrowestIntegerType(n)) } - | floatLit ^^ { f => Literal(f.toDouble) } + protected lazy val unsignedFloat: Parser[String] = + ( "." ~> numericLit ^^ { u => "0." + u } + | elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars) ) + protected lazy val sign: Parser[String] = ("+" | "-") + + protected lazy val integral: Parser[String] = + sign.? ~ numericLit ^^ { case s ~ n => s.getOrElse("") + n } + + private def intervalUnit(unitName: String) = + acceptIf { + case lexical.Identifier(str) => + val normalized = lexical.normalizeKeyword(str) + normalized == unitName || normalized == unitName + "s" + case _ => false + } {_ => "wrong interval unit"} + + protected lazy val month: Parser[Int] = + integral <~ intervalUnit("month") ^^ { case num => num.toInt } + + protected lazy val year: Parser[Int] = + integral <~ intervalUnit("year") ^^ { case num => num.toInt * 12 } + + protected lazy val microsecond: Parser[Long] = + integral <~ intervalUnit("microsecond") ^^ { case num => num.toLong } + + protected lazy val millisecond: Parser[Long] = + integral <~ intervalUnit("millisecond") ^^ { case num => num.toLong * 1000 } + + protected lazy val second: Parser[Long] = + integral <~ intervalUnit("second") ^^ { case num => num.toLong * 1000 * 1000 } + + protected lazy val minute: Parser[Long] = + integral <~ intervalUnit("minute") ^^ { case num => num.toLong * 1000 * 1000 * 60 } + + protected lazy val hour: Parser[Long] = + integral <~ intervalUnit("hour") ^^ { case num => num.toLong * 1000 * 1000 * 3600 } + + protected lazy val day: Parser[Long] = + integral <~ intervalUnit("day") ^^ { case num => num.toLong * 1000 * 1000 * 3600 * 24 } + + protected lazy val week: Parser[Long] = + integral <~ intervalUnit("week") ^^ { case num => num.toLong * 1000 * 1000 * 3600 * 24 * 7 } + + protected lazy val intervalLiteral: Parser[Literal] = + INTERVAL ~> year.? ~ month.? ~ week.? ~ day.? ~ hour.? ~ minute.? ~ second.? ~ + millisecond.? ~ microsecond.? ^^ { + case year ~ month ~ week ~ day ~ hour ~ minute ~ second ~ + millisecond ~ microsecond => + if (!Seq(year, month, week, day, hour, minute, second, + millisecond, microsecond).exists(_.isDefined)) { + throw new AnalysisException( + "at least one time unit should be given for interval literal") + } + val months = Seq(year, month).map(_.getOrElse(0)).sum + val microseconds = Seq(week, day, hour, minute, second, millisecond, microsecond) + .map(_.getOrElse(0L)).sum + Literal.create(new Interval(months, microseconds), IntervalType) + } + private def toNarrowestIntegerType(value: String): Any = { val bigIntValue = BigDecimal(value) @@ -343,11 +396,6 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { } } - protected lazy val floatLit: Parser[String] = - ( "." ~> unsignedNumericLiteral ^^ { u => "0." + u } - | elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars) - ) - protected lazy val baseExpression: Parser[Expression] = ( "*" ^^^ UnresolvedStar(None) | ident <~ "." ~ "*" ^^ { case tableName => UnresolvedStar(Option(tableName)) } @@ -355,7 +403,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { ) protected lazy val signedPrimary: Parser[Expression] = - sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e} + sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e } protected lazy val attributeName: Parser[String] = acceptMatch("attribute name", { case lexical.Identifier(str) => str diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala new file mode 100644 index 0000000000..87c6e9e6e5 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * The data type representing time intervals. + * + * Please use the singleton [[DataTypes.IntervalType]]. + */ +@DeveloperApi +class IntervalType private() extends DataType { + + override def defaultSize: Int = 4096 + + private[spark] override def asNullable: IntervalType = this +} + +case object IntervalType extends IntervalType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala index de4b511edc..2be9b2d76c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala @@ -41,7 +41,7 @@ class TimestampType private() extends AtomicType { private[sql] val ordering = implicitly[Ordering[InternalType]] /** - * The default size of a value of the TimestampType is 12 bytes. + * The default size of a value of the TimestampType is 8 bytes. */ override def defaultSize: Int = 8 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 1f0b93e285..d7440c55bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -304,6 +304,9 @@ private[sql] object ResolvedDataSource { mode: SaveMode, options: Map[String, String], data: DataFrame): ResolvedDataSource = { + if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } val clazz: Class[_] = lookupDataSource(provider) val relation = clazz.newInstance() match { case dataSource: CreatableRelationProvider => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 12ad019e8b..231440892b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1467,4 +1467,29 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { checkAnswer(sql("select count, sort from t"), Row(1, "a")) } } + + test("SPARK-8753: add interval type") { + import org.apache.spark.unsafe.types.Interval + + val df = sql("select interval 3 years -3 month 7 week 123 microseconds") + checkAnswer(df, Row(new Interval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 * 7 + 123 ))) + withTempPath(f => { + // Currently we don't yet support saving out values of interval data type. + val e = intercept[AnalysisException] { + df.write.json(f.getCanonicalPath) + } + e.message.contains("Cannot save interval data type into external storage") + }) + + def checkIntervalParseError(s: String): Unit = { + val e = intercept[AnalysisException] { + sql(s) + } + e.message.contains("at least one time unit should be given for interval literal") + } + + checkIntervalParseError("select interval") + // Currently we don't yet support nanosecond + checkIntervalParseError("select interval 23 nanosecond") + } } diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java new file mode 100644 index 0000000000..3eb67ede06 --- /dev/null +++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.types; + +import java.io.Serializable; + +/** + * The internal representation of interval type. + */ +public final class Interval implements Serializable { + public final int months; + public final long microseconds; + + public Interval(int months, long microseconds) { + this.months = months; + this.microseconds = microseconds; + } + + @Override + public boolean equals(Object other) { + if (this == other) return true; + if (other == null || !(other instanceof Interval)) return false; + + Interval o = (Interval) other; + return this.months == o.months && this.microseconds == o.microseconds; + } + + @Override + public int hashCode() { + return 31 * months + (int) microseconds; + } +} -- cgit v1.2.3