aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-22 18:03:59 -0700
committerDavies Liu <davies@databricks.com>2015-06-22 18:03:59 -0700
commit6b7f2ceafdcbb014791909747c2210b527305df9 (patch)
tree7a99294d26f19626b878fe6c8c6d889884f80df8
parent860a49ef20cea5711a7f54de0053ea33647e56a7 (diff)
downloadspark-6b7f2ceafdcbb014791909747c2210b527305df9.tar.gz
spark-6b7f2ceafdcbb014791909747c2210b527305df9.tar.bz2
spark-6b7f2ceafdcbb014791909747c2210b527305df9.zip
[SPARK-8307] [SQL] improve timestamp from parquet
This PR change to convert julian day to unix timestamp directly (without Calendar and Timestamp). cc adrian-wang rxin Author: Davies Liu <davies@databricks.com> Closes #6759 from davies/improve_ts and squashes the following commits: 849e301 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts b0e4cad [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 8e2d56f [Davies Liu] address comments 634b9f5 [Davies Liu] fix mima 4891efb [Davies Liu] address comment bfc437c [Davies Liu] fix build ae5979c [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 602b969 [Davies Liu] remove jodd 2f2e48c [Davies Liu] fix test 8ace611 [Davies Liu] fix mima 212143b [Davies Liu] fix mina c834108 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts a3171b8 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 5233974 [Davies Liu] fix scala style 361fd62 [Davies Liu] address comments ea196d4 [Davies Liu] improve timestamp from parquet
-rw-r--r--pom.xml1
-rw-r--r--project/MimaExcludes.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala (renamed from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala)41
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala11
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala)28
-rw-r--r--sql/core/pom.xml5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala86
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala69
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala4
24 files changed, 175 insertions, 252 deletions
diff --git a/pom.xml b/pom.xml
index 6d4f717d49..80cacb5ace 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,7 +156,6 @@
<scala.binary.version>2.10</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
- <jodd.version>3.6.3</jodd.version>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
<snappy.version>1.1.1.7</snappy.version>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 015d0296dd..7a748fb5e3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,7 +54,17 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
// SQL execution is considered private.
- excludePackage("org.apache.spark.sql.execution")
+ excludePackage("org.apache.spark.sql.execution"),
+ // NanoTime and CatalystTimestampConverter is only used inside catalyst,
+ // not needed anymore
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.timestamp.NanoTime"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.timestamp.NanoTime$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.CatalystTimestampConverter"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.CatalystTimestampConverter$")
)
case v if v.startsWith("1.4") =>
Seq(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 620e8de83a..429fc4077b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst
import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
-import java.sql.{Timestamp, Date}
+import java.sql.{Date, Timestamp}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable
import scala.collection.mutable.HashMap
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -272,18 +272,18 @@ object CatalystTypeConverters {
}
private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
- override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
+ override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
override def toScala(catalystValue: Any): Date =
- if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
+ if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
}
private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Timestamp): Long =
- DateUtils.fromJavaTimestamp(scalaValue)
+ DateTimeUtils.fromJavaTimestamp(scalaValue)
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
- else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
+ else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
toScala(row.getLong(column))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index ad920f2878..d271434a30 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -24,7 +24,7 @@ import java.text.{DateFormat, SimpleDateFormat}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -115,9 +115,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
- case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d)))
+ case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.toString(d)))
case TimestampType => buildCast[Long](_,
- t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t))))
+ t => UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
}
@@ -162,7 +162,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
if (periodIdx != -1 && n.length() - periodIdx > 9) {
n = n.substring(0, periodIdx + 10)
}
- try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
+ try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n))
catch { case _: java.lang.IllegalArgumentException => null }
})
case BooleanType =>
@@ -176,7 +176,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, b => longToTimestamp(b.toLong))
case DateType =>
- buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
+ buildCast[Int](_, d => DateTimeUtils.toMillisSinceEpoch(d) * 10000)
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -225,13 +225,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
private[this] def castToDate(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, s =>
- try DateUtils.fromJavaDate(Date.valueOf(s.toString))
+ try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString))
catch { case _: java.lang.IllegalArgumentException => null }
)
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
- buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
+ buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception,
// so we can return null
@@ -442,7 +442,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (DateType, StringType) =>
defineCodeGen(ctx, ev, c =>
s"""${ctx.stringType}.fromString(
- org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""")
+ org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))""")
// Special handling required for timestamps in hive test cases since the toString function
// does not match the expected output.
case (TimestampType, StringType) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 6c86a47ba2..479224af56 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -39,8 +39,8 @@ object Literal {
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: Decimal => Literal(d, DecimalType.Unlimited)
- case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
- case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
+ case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
+ case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
case _ =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 5cadc141af..ff79884a44 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -17,18 +17,28 @@
package org.apache.spark.sql.catalyst.util
-import java.sql.{Timestamp, Date}
+import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}
import org.apache.spark.sql.catalyst.expressions.Cast
/**
- * Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
+ * Helper functions for converting between internal and external date and time representations.
+ * Dates are exposed externally as java.sql.Date and are represented internally as the number of
+ * dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp
+ * and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond
+ * precision.
*/
-object DateUtils {
- private val MILLIS_PER_DAY = 86400000
- private val HUNDRED_NANOS_PER_SECOND = 10000000L
+object DateTimeUtils {
+ final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
+
+ // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
+ final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5
+ final val SECONDS_PER_DAY = 60 * 60 * 24L
+ final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L
+ final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100
+
// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
@@ -117,4 +127,25 @@ object DateUtils {
0L
}
}
+
+ /**
+ * Return the number of 100ns (hundred of nanoseconds) since epoch from Julian day
+ * and nanoseconds in a day
+ */
+ def fromJulianDay(day: Int, nanoseconds: Long): Long = {
+ // use Long to avoid rounding errors
+ val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
+ seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L
+ }
+
+ /**
+ * Return Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
+ */
+ def toJulianDay(num100ns: Long): (Int, Long) = {
+ val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2
+ val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
+ val secondsInDay = seconds % SECONDS_PER_DAY
+ val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L
+ (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index e407f6f166..f3809be722 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Timestamp, Date}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
/**
@@ -156,7 +156,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(cast(sd, DateType), StringType), sd)
checkEvaluation(cast(cast(d, StringType), DateType), 0)
checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
- checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
+ checkEvaluation(cast(cast(ts, StringType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
// all convert to string type to check
checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
@@ -301,9 +301,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.002f)
checkEvaluation(cast(ts, DoubleType), 15.002)
- checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
- checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
- checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
+ checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
+ checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
+ DateTimeUtils.fromJavaTimestamp(ts))
+ checkEvaluation(cast(cast(tss, LongType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(
cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
millis.toFloat / 1000)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
index b6261bfba0..72fec3b86e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
@@ -23,7 +23,7 @@ import scala.collection.immutable.HashSet
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{IntegerType, BooleanType}
@@ -167,8 +167,8 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Literal(true) <=> Literal.create(null, BooleanType), false, row)
checkEvaluation(Literal.create(null, BooleanType) <=> Literal(true), false, row)
- val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
- val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
+ val d1 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01"))
+ val d2 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-02"))
checkEvaluation(Literal(d1) < Literal(d2), true)
val ts1 = new Timestamp(12)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index d8f3351d6d..c0675f4f4d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -23,7 +23,7 @@ import java.util.Arrays
import org.scalatest.Matchers
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods
@@ -83,8 +83,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val row = new SpecificMutableRow(fieldTypes)
row.setLong(0, 0)
row.setString(1, "Hello")
- row.update(2, DateUtils.fromJavaDate(Date.valueOf("1970-01-01")))
- row.update(3, DateUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
+ row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
+ row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
val sizeRequired: Int = converter.getSizeRequirement(row)
sizeRequired should be (8 + (8 * 4) +
@@ -98,9 +98,9 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
unsafeRow.getLong(0) should be (0)
unsafeRow.getString(1) should be ("Hello")
// Date is represented as Int in unsafeRow
- DateUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
+ DateTimeUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
// Timestamp is represented as Long in unsafeRow
- DateUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
+ DateTimeUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
(Timestamp.valueOf("2015-05-08 08:10:25"))
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 4d8fe4ac5e..03eb64f097 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -21,19 +21,31 @@ import java.sql.Timestamp
import org.apache.spark.SparkFunSuite
-class DateUtilsSuite extends SparkFunSuite {
+class DateTimeUtilsSuite extends SparkFunSuite {
- test("timestamp") {
+ test("timestamp and 100ns") {
val now = new Timestamp(System.currentTimeMillis())
now.setNanos(100)
- val ns = DateUtils.fromJavaTimestamp(now)
- assert(ns % 10000000L == 1)
- assert(DateUtils.toJavaTimestamp(ns) == now)
+ val ns = DateTimeUtils.fromJavaTimestamp(now)
+ assert(ns % 10000000L === 1)
+ assert(DateTimeUtils.toJavaTimestamp(ns) === now)
List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
- val ts = DateUtils.toJavaTimestamp(t)
- assert(DateUtils.fromJavaTimestamp(ts) == t)
- assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
+ val ts = DateTimeUtils.toJavaTimestamp(t)
+ assert(DateTimeUtils.fromJavaTimestamp(ts) === t)
+ assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) === ts)
}
}
+
+ test("100ns and julian day") {
+ val (d, ns) = DateTimeUtils.toJulianDay(0)
+ assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH)
+ assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND)
+ assert(DateTimeUtils.fromJulianDay(d, ns) == 0L)
+
+ val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100)
+ val (d1, ns1) = DateTimeUtils.toJulianDay(DateTimeUtils.fromJavaTimestamp(t))
+ val t2 = DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJulianDay(d1, ns1))
+ assert(t.equals(t2))
+ }
}
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index ed75475a87..8fc16928ad 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -74,11 +74,6 @@
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.jodd</groupId>
- <artifactId>jodd-core</artifactId>
- <version>${jodd.version}</version>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index c8c67ce334..6db551c543 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -148,8 +148,8 @@ object EvaluatePython {
case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)
- case (date: Int, DateType) => DateUtils.toJavaDate(date)
- case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
+ case (date: Int, DateType) => DateTimeUtils.toJavaDate(date)
+ case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t)
case (s: UTF8String, StringType) => s.toString
// Pyrolite can handle Timestamp and Decimal
@@ -188,12 +188,12 @@ object EvaluatePython {
}): Row
case (c: java.util.Calendar, DateType) =>
- DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
+ DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
case (c: java.util.Calendar, TimestampType) =>
c.getTimeInMillis * 10000L
case (t: java.sql.Timestamp, TimestampType) =>
- DateUtils.fromJavaTimestamp(t)
+ DateTimeUtils.fromJavaTimestamp(t)
case (_, udt: UserDefinedType[_]) =>
fromJava(obj, udt.sqlType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 226b143923..8b4276b2c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -22,13 +22,13 @@ import java.util.Properties
import org.apache.commons.lang3.StringUtils
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{InternalRow, SpecificMutableRow}
-import org.apache.spark.sql.catalyst.util.DateUtils
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
/**
* Data corresponding to one partition of a JDBCRDD.
@@ -383,10 +383,10 @@ private[sql] class JDBCRDD(
conversions(i) match {
case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos))
case DateConversion =>
- // DateUtils.fromJavaDate does not handle null value, so we need to check it.
+ // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos)
if (dateVal != null) {
- mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal))
+ mutableRow.setInt(i, DateTimeUtils.fromJavaDate(dateVal))
} else {
mutableRow.update(i, null)
}
@@ -421,7 +421,7 @@ private[sql] class JDBCRDD(
case TimestampConversion =>
val t = rs.getTimestamp(pos)
if (t != null) {
- mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t))
+ mutableRow.setLong(i, DateTimeUtils.fromJavaTimestamp(t))
} else {
mutableRow.update(i, null)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index 817e8a20b3..6222addc9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -63,10 +63,10 @@ private[sql] object JacksonParser {
null
case (VALUE_STRING, DateType) =>
- DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime)
+ DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
case (VALUE_STRING, TimestampType) =>
- DateUtils.stringToTime(parser.getText).getTime * 10000L
+ DateTimeUtils.stringToTime(parser.getText).getTime * 10000L
case (VALUE_NUMBER_INT, TimestampType) =>
parser.getLongValue * 10000L
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 44594c5080..73d9520d6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -393,8 +393,8 @@ private[sql] object JsonRDD extends Logging {
value match {
// only support string as date
case value: java.lang.String =>
- DateUtils.millisToDays(DateUtils.stringToTime(value).getTime)
- case value: java.sql.Date => DateUtils.fromJavaDate(value)
+ DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(value).getTime)
+ case value: java.sql.Date => DateTimeUtils.fromJavaDate(value)
}
}
@@ -402,7 +402,7 @@ private[sql] object JsonRDD extends Logging {
value match {
case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L
case value: java.lang.Long => value * 10000L
- case value: java.lang.String => DateUtils.stringToTime(value).getTime * 10000L
+ case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 10000L
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 4da5e96b82..cf7aa44e4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -17,21 +17,19 @@
package org.apache.spark.sql.parquet
-import java.sql.Timestamp
-import java.util.{TimeZone, Calendar}
+import java.nio.ByteOrder
-import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap}
-import jodd.datetime.JDateTime
+import org.apache.parquet.Preconditions
import org.apache.parquet.column.Dictionary
-import org.apache.parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
+import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
-import org.apache.spark.sql.parquet.timestamp.NanoTime
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -269,7 +267,12 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
* Read a Timestamp value from a Parquet Int96Value
*/
protected[parquet] def readTimestamp(value: Binary): Long = {
- DateUtils.fromJavaTimestamp(CatalystTimestampConverter.convertToTimestamp(value))
+ Preconditions.checkArgument(value.length() == 12, "Must be 12 bytes")
+ val buf = value.toByteBuffer
+ buf.order(ByteOrder.LITTLE_ENDIAN)
+ val timeOfDayNanos = buf.getLong
+ val julianDay = buf.getInt
+ DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
}
}
@@ -498,73 +501,6 @@ private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}
-private[parquet] object CatalystTimestampConverter {
- // TODO most part of this comes from Hive-0.14
- // Hive code might have some issues, so we need to keep an eye on it.
- // Also we use NanoTime and Int96Values from parquet-examples.
- // We utilize jodd to convert between NanoTime and Timestamp
- val parquetTsCalendar = new ThreadLocal[Calendar]
- def getCalendar: Calendar = {
- // this is a cache for the calendar instance.
- if (parquetTsCalendar.get == null) {
- parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))
- }
- parquetTsCalendar.get
- }
- val NANOS_PER_SECOND: Long = 1000000000
- val SECONDS_PER_MINUTE: Long = 60
- val MINUTES_PER_HOUR: Long = 60
- val NANOS_PER_MILLI: Long = 1000000
-
- def convertToTimestamp(value: Binary): Timestamp = {
- val nt = NanoTime.fromBinary(value)
- val timeOfDayNanos = nt.getTimeOfDayNanos
- val julianDay = nt.getJulianDay
- val jDateTime = new JDateTime(julianDay.toDouble)
- val calendar = getCalendar
- calendar.set(Calendar.YEAR, jDateTime.getYear)
- calendar.set(Calendar.MONTH, jDateTime.getMonth - 1)
- calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay)
-
- // written in command style
- var remainder = timeOfDayNanos
- calendar.set(
- Calendar.HOUR_OF_DAY,
- (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt)
- remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)
- calendar.set(
- Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt)
- remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE)
- calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt)
- val nanos = remainder % NANOS_PER_SECOND
- val ts = new Timestamp(calendar.getTimeInMillis)
- ts.setNanos(nanos.toInt)
- ts
- }
-
- def convertFromTimestamp(ts: Timestamp): Binary = {
- val calendar = getCalendar
- calendar.setTime(ts)
- val jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
- calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH))
- // Hive-0.14 didn't set hour before get day number, while the day number should
- // has something to do with hour, since julian day number grows at 12h GMT
- // here we just follow what hive does.
- val julianDay = jDateTime.getJulianDayNumber
-
- val hour = calendar.get(Calendar.HOUR_OF_DAY)
- val minute = calendar.get(Calendar.MINUTE)
- val second = calendar.get(Calendar.SECOND)
- val nanos = ts.getNanos
- // Hive-0.14 would use hours directly, that might be wrong, since the day starts
- // from 12h in Julian. here we just follow what hive does.
- val nanosOfDay = nanos + second * NANOS_PER_SECOND +
- minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE +
- hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR
- NanoTime(julianDay, nanosOfDay).toBinary
- }
-}
-
/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index a8775a2a8f..e65fa0030e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.parquet
+import java.nio.{ByteOrder, ByteBuffer}
import java.util.{HashMap => JHashMap}
import org.apache.hadoop.conf.Configuration
@@ -29,7 +30,7 @@ import org.apache.parquet.schema.MessageType
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -298,7 +299,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
}
// Scratch array used to write decimals as fixed-length binary
- private val scratchBytes = new Array[Byte](8)
+ private[this] val scratchBytes = new Array[Byte](8)
private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
val numBytes = ParquetTypesConverter.BYTES_FOR_PRECISION(precision)
@@ -313,10 +314,16 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
}
+ // array used to write Timestamp as Int96 (fixed-length binary)
+ private[this] val int96buf = new Array[Byte](12)
+
private[parquet] def writeTimestamp(ts: Long): Unit = {
- val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(
- DateUtils.toJavaTimestamp(ts))
- writer.addBinary(binaryNanoTime)
+ val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
+ val buf = ByteBuffer.wrap(int96buf)
+ buf.order(ByteOrder.LITTLE_ENDIAN)
+ buf.putLong(timeOfDayNanos)
+ buf.putInt(julianDay)
+ writer.addBinary(Binary.fromByteArray(int96buf))
}
}
@@ -360,7 +367,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
case DateType => writer.addInteger(record.getInt(index))
- case TimestampType => writeTimestamp(record(index).asInstanceOf[Long])
+ case TimestampType => writeTimestamp(record.getLong(index))
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
deleted file mode 100644
index 4d5ed211ad..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet.timestamp
-
-import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.parquet.Preconditions
-import org.apache.parquet.io.api.{Binary, RecordConsumer}
-
-private[parquet] class NanoTime extends Serializable {
- private var julianDay = 0
- private var timeOfDayNanos = 0L
-
- def set(julianDay: Int, timeOfDayNanos: Long): this.type = {
- this.julianDay = julianDay
- this.timeOfDayNanos = timeOfDayNanos
- this
- }
-
- def getJulianDay: Int = julianDay
-
- def getTimeOfDayNanos: Long = timeOfDayNanos
-
- def toBinary: Binary = {
- val buf = ByteBuffer.allocate(12)
- buf.order(ByteOrder.LITTLE_ENDIAN)
- buf.putLong(timeOfDayNanos)
- buf.putInt(julianDay)
- buf.flip()
- Binary.fromByteBuffer(buf)
- }
-
- def writeValue(recordConsumer: RecordConsumer): Unit = {
- recordConsumer.addBinary(toBinary)
- }
-
- override def toString: String =
- "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"
-}
-
-private[sql] object NanoTime {
- def fromBinary(bytes: Binary): NanoTime = {
- Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
- val buf = bytes.toByteBuffer
- buf.order(ByteOrder.LITTLE_ENDIAN)
- val timeOfDayNanos = buf.getLong
- val julianDay = buf.getInt
- new NanoTime().set(julianDay, timeOfDayNanos)
- }
-
- def apply(julianDay: Int, timeOfDayNanos: Long): NanoTime = {
- new NanoTime().set(julianDay, timeOfDayNanos)
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index c32d9f88dd..8204a58417 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -25,7 +25,7 @@ import org.scalactic.Tolerance._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.json.InferSchema.compatibleType
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
@@ -76,26 +76,28 @@ class JsonSuite extends QueryTest with TestJsonData {
checkTypePromotion(
Decimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType.Unlimited))
- checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber)),
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)),
enforceCorrectType(intNumber, TimestampType))
- checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
enforceCorrectType(intNumber.toLong, TimestampType))
val strTime = "2014-09-30 12:34:56"
- checkTypePromotion(DateUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
enforceCorrectType(strTime, TimestampType))
val strDate = "2014-10-15"
checkTypePromotion(
- DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
+ DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
- checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(3601000)),
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
enforceCorrectType(ISO8601Time1, TimestampType))
- checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType))
+ checkTypePromotion(DateTimeUtils.millisToDays(3601000),
+ enforceCorrectType(ISO8601Time1, DateType))
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
- checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(10801000)),
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
enforceCorrectType(ISO8601Time2, TimestampType))
- checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType))
+ checkTypePromotion(DateTimeUtils.millisToDays(10801000),
+ enforceCorrectType(ISO8601Time2, DateType))
}
test("Get compatible type") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 284d99d493..47a7be1c6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -37,7 +37,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
@@ -137,7 +137,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
def makeDateRDD(): DataFrame =
sqlContext.sparkContext
.parallelize(0 to 1000)
- .map(i => Tuple1(DateUtils.toJavaDate(i)))
+ .map(i => Tuple1(DateTimeUtils.toJavaDate(i)))
.toDF()
.select($"_1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 4887577322..79eac930e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -17,13 +17,12 @@
package org.apache.spark.sql.sources
-import java.sql.{Timestamp, Date}
-
+import java.sql.{Date, Timestamp}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -84,8 +83,8 @@ case class AllDataTypesScan(
i.toDouble,
Decimal(new java.math.BigDecimal(i)),
Decimal(new java.math.BigDecimal(i)),
- DateUtils.fromJavaDate(new Date(1970, 1, 1)),
- DateUtils.fromJavaTimestamp(new Timestamp(20000 + i)),
+ DateTimeUtils.fromJavaDate(new Date(1970, 1, 1)),
+ DateTimeUtils.fromJavaTimestamp(new Timestamp(20000 + i)),
UTF8String.fromString(s"varchar_$i"),
Seq(i, i + 1),
Seq(Map(UTF8String.fromString(s"str_$i") -> InternalRow(i.toLong))),
@@ -93,7 +92,7 @@ case class AllDataTypesScan(
Map(Map(UTF8String.fromString(s"str_$i") -> i.toFloat) -> InternalRow(i.toLong)),
Row(i, i.toString),
Row(Seq(UTF8String.fromString(s"str_$i"), UTF8String.fromString(s"str_${i + 1}")),
- InternalRow(Seq(DateUtils.fromJavaDate(new Date(1970, 1, i + 1))))))
+ InternalRow(Seq(DateTimeUtils.fromJavaDate(new Date(1970, 1, i + 1))))))
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index d4f1ae8ee0..864c888ab0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -273,7 +273,7 @@ private[hive] trait HiveInspectors {
System.arraycopy(writable.getBytes, 0, temp, 0, temp.length)
temp
case poi: WritableConstantDateObjectInspector =>
- DateUtils.fromJavaDate(poi.getWritableConstantValue.get())
+ DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get())
case mi: StandardConstantMapObjectInspector =>
// take the value from the map inspector object, rather than the input data
mi.getWritableConstantValue.map { case (k, v) =>
@@ -313,13 +313,13 @@ private[hive] trait HiveInspectors {
System.arraycopy(bw.getBytes(), 0, result, 0, bw.getLength())
result
case x: DateObjectInspector if x.preferWritable() =>
- DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
- case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
+ DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
+ case x: DateObjectInspector => DateTimeUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
case x: TimestampObjectInspector if x.preferWritable() =>
val t = x.getPrimitiveWritableObject(data)
t.getSeconds * 10000000L + t.getNanos / 100
case ti: TimestampObjectInspector =>
- DateUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data))
+ DateTimeUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data))
case _ => pi.getPrimitiveJavaObject(data)
}
case li: ListObjectInspector =>
@@ -356,10 +356,10 @@ private[hive] trait HiveInspectors {
(o: Any) => HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal)
case _: JavaDateObjectInspector =>
- (o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
+ (o: Any) => DateTimeUtils.toJavaDate(o.asInstanceOf[Int])
case _: JavaTimestampObjectInspector =>
- (o: Any) => DateUtils.toJavaTimestamp(o.asInstanceOf[Long])
+ (o: Any) => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long])
case soi: StandardStructObjectInspector =>
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
@@ -468,9 +468,9 @@ private[hive] trait HiveInspectors {
case _: BinaryObjectInspector if x.preferWritable() => getBinaryWritable(a)
case _: BinaryObjectInspector => a.asInstanceOf[Array[Byte]]
case _: DateObjectInspector if x.preferWritable() => getDateWritable(a)
- case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
+ case _: DateObjectInspector => DateTimeUtils.toJavaDate(a.asInstanceOf[Int])
case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a)
- case _: TimestampObjectInspector => DateUtils.toJavaTimestamp(a.asInstanceOf[Long])
+ case _: TimestampObjectInspector => DateTimeUtils.toJavaTimestamp(a.asInstanceOf[Long])
}
case x: SettableStructObjectInspector =>
val fieldRefs = x.getAllStructFieldRefs
@@ -781,7 +781,7 @@ private[hive] trait HiveInspectors {
if (value == null) {
null
} else {
- new hiveIo.TimestampWritable(DateUtils.toJavaTimestamp(value.asInstanceOf[Long]))
+ new hiveIo.TimestampWritable(DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long]))
}
private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 439f39bafc..00e61e35d4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
@@ -362,10 +362,10 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
case oi: TimestampObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
- row.setLong(ordinal, DateUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
+ row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
case oi: DateObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
- row.setInt(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
+ row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
case oi: BinaryObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
row.update(ordinal, oi.getPrimitiveJavaObject(value))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8b928861fc..ab75b12e2a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.common.FileUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
-import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableJobConf
@@ -201,7 +201,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
def convertToHiveRawString(col: String, value: Any): String = {
val raw = String.valueOf(value)
schema(col).dataType match {
- case DateType => DateUtils.toString(raw.toInt)
+ case DateType => DateTimeUtils.toString(raw.toInt)
case _: DecimalType => BigDecimal(raw).toString()
case _ => raw
}