aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala46
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala436
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala321
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala160
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala32
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala63
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala44
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala28
18 files changed, 709 insertions, 693 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 909b8e31f2..c11dab35cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -108,7 +108,9 @@ final class Decimal extends Ordered[Decimal] with Serializable {
*/
def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
this.decimalVal = decimal.setScale(scale, ROUNDING_MODE)
- require(decimalVal.precision <= precision, "Overflowed precision")
+ require(
+ decimalVal.precision <= precision,
+ s"Decimal precision ${decimalVal.precision} exceeds max precision $precision")
this.longVal = 0L
this._precision = precision
this._scale = scale
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 8f0f8910b3..47397c4be3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -292,10 +292,9 @@ private[spark] object SQLConf {
val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
key = "spark.sql.parquet.writeLegacyFormat",
- defaultValue = Some(true),
+ defaultValue = Some(false),
doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
- "Spark SQL schema and vice versa.",
- isPublic = false)
+ "Spark SQL schema and vice versa.")
val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
key = "spark.sql.parquet.output.committer.class",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 5325698034..a958373eb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -95,7 +95,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
""".stripMargin
}
- new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
+ new CatalystRecordMaterializer(
+ parquetRequestedSchema,
+ CatalystReadSupport.expandUDT(catalystRequestedSchema))
}
}
@@ -110,7 +112,10 @@ private[parquet] object CatalystReadSupport {
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
- Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
+ Types
+ .buildMessage()
+ .addFields(clippedParquetFields: _*)
+ .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
@@ -271,4 +276,30 @@ private[parquet] object CatalystReadSupport {
.getOrElse(toParquet.convertField(f))
}
}
+
+ def expandUDT(schema: StructType): StructType = {
+ def expand(dataType: DataType): DataType = {
+ dataType match {
+ case t: ArrayType =>
+ t.copy(elementType = expand(t.elementType))
+
+ case t: MapType =>
+ t.copy(
+ keyType = expand(t.keyType),
+ valueType = expand(t.valueType))
+
+ case t: StructType =>
+ val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType)))
+ t.copy(fields = expandedFields)
+
+ case t: UserDefinedType[_] =>
+ t.sqlType
+
+ case t =>
+ t
+ }
+ }
+
+ expand(schema).asInstanceOf[StructType]
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 050d3610a6..247d35363b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
-import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
import org.apache.spark.Logging
@@ -114,7 +113,8 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
* any "parent" container.
*
* @param parquetType Parquet schema of Parquet records
- * @param catalystType Spark SQL schema that corresponds to the Parquet record type
+ * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
+ * types should have been expanded.
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class CatalystRowConverter(
@@ -133,6 +133,12 @@ private[parquet] class CatalystRowConverter(
|${catalystType.prettyJson}
""".stripMargin)
+ assert(
+ !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]),
+ s"""User-defined types in Catalyst schema should have already been expanded:
+ |${catalystType.prettyJson}
+ """.stripMargin)
+
logDebug(
s"""Building row converter for the following schema:
|
@@ -268,13 +274,6 @@ private[parquet] class CatalystRowConverter(
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})
- case t: UserDefinedType[_] =>
- val catalystTypeForUDT = t.sqlType
- val nullable = parquetType.isRepetition(Repetition.OPTIONAL)
- val field = StructField("udt", catalystTypeForUDT, nullable)
- val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field)
- newConverter(parquetTypeForUDT, catalystTypeForUDT, updater)
-
case _ =>
throw new RuntimeException(
s"Unable to create Parquet converter for data type ${catalystType.json}")
@@ -340,30 +339,36 @@ private[parquet] class CatalystRowConverter(
val scale = decimalType.scale
if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
- // Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying
- // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
- // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
- val buffer = value.toByteBuffer
- val bytes = buffer.array()
- val start = buffer.position()
- val end = buffer.limit()
-
- var unscaled = 0L
- var i = start
-
- while (i < end) {
- unscaled = (unscaled << 8) | (bytes(i) & 0xff)
- i += 1
- }
-
- val bits = 8 * (end - start)
- unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+ // Constructs a `Decimal` with an unscaled `Long` value if possible.
+ val unscaled = binaryToUnscaledLong(value)
Decimal(unscaled, precision, scale)
} else {
// Otherwise, resorts to an unscaled `BigInteger` instead.
Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)
}
}
+
+ private def binaryToUnscaledLong(binary: Binary): Long = {
+ // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
+ // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
+ // copying it.
+ val buffer = binary.toByteBuffer
+ val bytes = buffer.array()
+ val start = buffer.position()
+ val end = buffer.limit()
+
+ var unscaled = 0L
+ var i = start
+
+ while (i < end) {
+ unscaled = (unscaled << 8) | (bytes(i) & 0xff)
+ i += 1
+ }
+
+ val bits = 8 * (end - start)
+ unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+ unscaled
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 6904fc736c..7f3394c20e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -121,7 +121,7 @@ private[parquet] class CatalystSchemaConverter(
val precision = field.getDecimalMetadata.getPrecision
val scale = field.getDecimalMetadata.getScale
- CatalystSchemaConverter.analysisRequire(
+ CatalystSchemaConverter.checkConversionRequirement(
maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
@@ -155,7 +155,7 @@ private[parquet] class CatalystSchemaConverter(
}
case INT96 =>
- CatalystSchemaConverter.analysisRequire(
+ CatalystSchemaConverter.checkConversionRequirement(
assumeInt96IsTimestamp,
"INT96 is not supported unless it's interpreted as timestamp. " +
s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
@@ -197,11 +197,11 @@ private[parquet] class CatalystSchemaConverter(
//
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
case LIST =>
- CatalystSchemaConverter.analysisRequire(
+ CatalystSchemaConverter.checkConversionRequirement(
field.getFieldCount == 1, s"Invalid list type $field")
val repeatedType = field.getType(0)
- CatalystSchemaConverter.analysisRequire(
+ CatalystSchemaConverter.checkConversionRequirement(
repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
if (isElementType(repeatedType, field.getName)) {
@@ -217,17 +217,17 @@ private[parquet] class CatalystSchemaConverter(
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
// scalastyle:on
case MAP | MAP_KEY_VALUE =>
- CatalystSchemaConverter.analysisRequire(
+ CatalystSchemaConverter.checkConversionRequirement(
field.getFieldCount == 1 && !field.getType(0).isPrimitive,
s"Invalid map type: $field")
val keyValueType = field.getType(0).asGroupType()
- CatalystSchemaConverter.analysisRequire(
+ CatalystSchemaConverter.checkConversionRequirement(
keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2,
s"Invalid map type: $field")
val keyType = keyValueType.getType(0)
- CatalystSchemaConverter.analysisRequire(
+ CatalystSchemaConverter.checkConversionRequirement(
keyType.isPrimitive,
s"Map key type is expected to be a primitive type, but found: $keyType")
@@ -299,7 +299,10 @@ private[parquet] class CatalystSchemaConverter(
* Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
*/
def convert(catalystSchema: StructType): MessageType = {
- Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root")
+ Types
+ .buildMessage()
+ .addFields(catalystSchema.map(convertField): _*)
+ .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
/**
@@ -347,10 +350,10 @@ private[parquet] class CatalystSchemaConverter(
// NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
//
// As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
- // timestamp in Impala for some historical reasons, it's not recommended to be used for any
- // other types and will probably be deprecated in future Parquet format spec. That's the
- // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which
- // are both logical types annotating `INT64`.
+ // timestamp in Impala for some historical reasons. It's not recommended to be used for any
+ // other types and will probably be deprecated in some future version of parquet-format spec.
+ // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and
+ // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
//
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
@@ -361,7 +364,7 @@ private[parquet] class CatalystSchemaConverter(
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet.
//
- // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+ // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
@@ -523,11 +526,12 @@ private[parquet] class CatalystSchemaConverter(
}
}
-
private[parquet] object CatalystSchemaConverter {
+ val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
+
def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
- analysisRequire(
+ checkConversionRequirement(
!name.matches(".*[ ,;{}()\n\t=].*"),
s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
|Please use alias to rename it.
@@ -539,7 +543,7 @@ private[parquet] object CatalystSchemaConverter {
schema
}
- def analysisRequire(f: => Boolean, message: String): Unit = {
+ def checkConversionRequirement(f: => Boolean, message: String): Unit = {
if (!f) {
throw new AnalysisException(message)
}
@@ -553,16 +557,8 @@ private[parquet] object CatalystSchemaConverter {
numBytes
}
- private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision)
-
// Returns the minimum number of bytes needed to store a decimal with a given `precision`.
- def minBytesForPrecision(precision : Int) : Int = {
- if (precision < MIN_BYTES_FOR_PRECISION.length) {
- MIN_BYTES_FOR_PRECISION(precision)
- } else {
- computeMinBytesForPrecision(precision)
- }
- }
+ val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision)
val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
new file mode 100644
index 0000000000..483363d2c1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.util
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.io.api.{Binary, RecordConsumer}
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, minBytesForPrecision}
+import org.apache.spark.sql.types._
+
+/**
+ * A Parquet [[WriteSupport]] implementation that writes Catalyst [[InternalRow]]s as Parquet
+ * messages. This class can write Parquet data in two modes:
+ *
+ * - Standard mode: Parquet data are written in standard format defined in parquet-format spec.
+ * - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.4 and prior.
+ *
+ * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyFormat`. The value
+ * of this option is propagated to this class by the `init()` method and its Hadoop configuration
+ * argument.
+ */
+private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging {
+ // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer.
+ // Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access
+ // data in `ArrayData` without the help of `SpecificMutableRow`.
+ private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+ // Schema of the `InternalRow`s to be written
+ private var schema: StructType = _
+
+ // `ValueWriter`s for all fields of the schema
+ private var rootFieldWriters: Seq[ValueWriter] = _
+
+ // The Parquet `RecordConsumer` to which all `InternalRow`s are written
+ private var recordConsumer: RecordConsumer = _
+
+ // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
+ private var writeLegacyParquetFormat: Boolean = _
+
+ // Reusable byte array used to write timestamps as Parquet INT96 values
+ private val timestampBuffer = new Array[Byte](12)
+
+ // Reusable byte array used to write decimal values
+ private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
+
+ override def init(configuration: Configuration): WriteContext = {
+ val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA)
+ this.schema = StructType.fromString(schemaString)
+ this.writeLegacyParquetFormat = {
+ // `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
+ assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
+ configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
+ }
+ this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
+
+ val messageType = new CatalystSchemaConverter(configuration).convert(schema)
+ val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
+
+ logInfo(
+ s"""Initialized Parquet WriteSupport with Catalyst schema:
+ |${schema.prettyJson}
+ |and corresponding Parquet message type:
+ |$messageType
+ """.stripMargin)
+
+ new WriteContext(messageType, metadata)
+ }
+
+ override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+ this.recordConsumer = recordConsumer
+ }
+
+ override def write(row: InternalRow): Unit = {
+ consumeMessage {
+ writeFields(row, schema, rootFieldWriters)
+ }
+ }
+
+ private def writeFields(
+ row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
+ var i = 0
+ while (i < row.numFields) {
+ if (!row.isNullAt(i)) {
+ consumeField(schema(i).name, i) {
+ fieldWriters(i).apply(row, i)
+ }
+ }
+ i += 1
+ }
+ }
+
+ private def makeWriter(dataType: DataType): ValueWriter = {
+ dataType match {
+ case BooleanType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addBoolean(row.getBoolean(ordinal))
+
+ case ByteType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addInteger(row.getByte(ordinal))
+
+ case ShortType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addInteger(row.getShort(ordinal))
+
+ case IntegerType | DateType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addInteger(row.getInt(ordinal))
+
+ case LongType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addLong(row.getLong(ordinal))
+
+ case FloatType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addFloat(row.getFloat(ordinal))
+
+ case DoubleType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addDouble(row.getDouble(ordinal))
+
+ case StringType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
+
+ case TimestampType =>
+ (row: SpecializedGetters, ordinal: Int) => {
+ // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
+ // Currently we only support timestamps stored as INT96, which is compatible with Hive
+ // and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS`
+ // defined in the parquet-format spec. But up until writing, the most recent parquet-mr
+ // version (1.8.1) hasn't implemented it yet.
+
+ // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
+ // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
+ val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
+ val buf = ByteBuffer.wrap(timestampBuffer)
+ buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
+ recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
+ }
+
+ case BinaryType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
+
+ case DecimalType.Fixed(precision, scale) =>
+ makeDecimalWriter(precision, scale)
+
+ case t: StructType =>
+ val fieldWriters = t.map(_.dataType).map(makeWriter)
+ (row: SpecializedGetters, ordinal: Int) =>
+ consumeGroup {
+ writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
+ }
+
+ case t: ArrayType => makeArrayWriter(t)
+
+ case t: MapType => makeMapWriter(t)
+
+ case t: UserDefinedType[_] => makeWriter(t.sqlType)
+
+ // TODO Adds IntervalType support
+ case _ => sys.error(s"Unsupported data type $dataType.")
+ }
+ }
+
+ private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = {
+ assert(
+ precision <= DecimalType.MAX_PRECISION,
+ s"Decimal precision $precision exceeds max precision ${DecimalType.MAX_PRECISION}")
+
+ val numBytes = minBytesForPrecision(precision)
+
+ val int32Writer =
+ (row: SpecializedGetters, ordinal: Int) => {
+ val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+ recordConsumer.addInteger(unscaledLong.toInt)
+ }
+
+ val int64Writer =
+ (row: SpecializedGetters, ordinal: Int) => {
+ val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+ recordConsumer.addLong(unscaledLong)
+ }
+
+ val binaryWriterUsingUnscaledLong =
+ (row: SpecializedGetters, ordinal: Int) => {
+ // When the precision is low enough (<= 18) to squeeze the decimal value into a `Long`, we
+ // can build a fixed-length byte array with length `numBytes` using the unscaled `Long`
+ // value and the `decimalBuffer` for better performance.
+ val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+ var i = 0
+ var shift = 8 * (numBytes - 1)
+
+ while (i < numBytes) {
+ decimalBuffer(i) = (unscaled >> shift).toByte
+ i += 1
+ shift -= 8
+ }
+
+ recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes))
+ }
+
+ val binaryWriterUsingUnscaledBytes =
+ (row: SpecializedGetters, ordinal: Int) => {
+ val decimal = row.getDecimal(ordinal, precision, scale)
+ val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray
+ val fixedLengthBytes = if (bytes.length == numBytes) {
+ // If the length of the underlying byte array of the unscaled `BigInteger` happens to be
+ // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`.
+ bytes
+ } else {
+ // Otherwise, the length must be less than `numBytes`. In this case we copy contents of
+ // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result
+ // fixed-length byte array.
+ val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+ util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte)
+ System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length)
+ decimalBuffer
+ }
+
+ recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes))
+ }
+
+ writeLegacyParquetFormat match {
+ // Standard mode, 1 <= precision <= 9, writes as INT32
+ case false if precision <= MAX_PRECISION_FOR_INT32 => int32Writer
+
+ // Standard mode, 10 <= precision <= 18, writes as INT64
+ case false if precision <= MAX_PRECISION_FOR_INT64 => int64Writer
+
+ // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
+ case true if precision <= MAX_PRECISION_FOR_INT64 => binaryWriterUsingUnscaledLong
+
+ // Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
+ case _ => binaryWriterUsingUnscaledBytes
+ }
+ }
+
+ def makeArrayWriter(arrayType: ArrayType): ValueWriter = {
+ val elementWriter = makeWriter(arrayType.elementType)
+
+ def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: String): ValueWriter =
+ (row: SpecializedGetters, ordinal: Int) => {
+ val array = row.getArray(ordinal)
+ consumeGroup {
+ // Only creates the repeated field if the array is non-empty.
+ if (array.numElements() > 0) {
+ consumeField(repeatedGroupName, 0) {
+ var i = 0
+ while (i < array.numElements()) {
+ consumeGroup {
+ // Only creates the element field if the current array element is not null.
+ if (!array.isNullAt(i)) {
+ consumeField(elementFieldName, 0) {
+ elementWriter.apply(array, i)
+ }
+ }
+ }
+ i += 1
+ }
+ }
+ }
+ }
+ }
+
+ def twoLevelArrayWriter(repeatedFieldName: String): ValueWriter =
+ (row: SpecializedGetters, ordinal: Int) => {
+ val array = row.getArray(ordinal)
+ consumeGroup {
+ // Only creates the repeated field if the array is non-empty.
+ if (array.numElements() > 0) {
+ consumeField(repeatedFieldName, 0) {
+ var i = 0
+ while (i < array.numElements()) {
+ elementWriter.apply(array, i)
+ i += 1
+ }
+ }
+ }
+ }
+ }
+
+ (writeLegacyParquetFormat, arrayType.containsNull) match {
+ case (legacyMode @ false, _) =>
+ // Standard mode:
+ //
+ // <list-repetition> group <name> (LIST) {
+ // repeated group list {
+ // ^~~~ repeatedGroupName
+ // <element-repetition> <element-type> element;
+ // ^~~~~~~ elementFieldName
+ // }
+ // }
+ threeLevelArrayWriter(repeatedGroupName = "list", elementFieldName = "element")
+
+ case (legacyMode @ true, nullableElements @ true) =>
+ // Legacy mode, with nullable elements:
+ //
+ // <list-repetition> group <name> (LIST) {
+ // optional group bag {
+ // ^~~ repeatedGroupName
+ // repeated <element-type> array;
+ // ^~~~~ elementFieldName
+ // }
+ // }
+ threeLevelArrayWriter(repeatedGroupName = "bag", elementFieldName = "array")
+
+ case (legacyMode @ true, nullableElements @ false) =>
+ // Legacy mode, with non-nullable elements:
+ //
+ // <list-repetition> group <name> (LIST) {
+ // repeated <element-type> array;
+ // ^~~~~ repeatedFieldName
+ // }
+ twoLevelArrayWriter(repeatedFieldName = "array")
+ }
+ }
+
+ private def makeMapWriter(mapType: MapType): ValueWriter = {
+ val keyWriter = makeWriter(mapType.keyType)
+ val valueWriter = makeWriter(mapType.valueType)
+ val repeatedGroupName = if (writeLegacyParquetFormat) {
+ // Legacy mode:
+ //
+ // <map-repetition> group <name> (MAP) {
+ // repeated group map (MAP_KEY_VALUE) {
+ // ^~~ repeatedGroupName
+ // required <key-type> key;
+ // <value-repetition> <value-type> value;
+ // }
+ // }
+ "map"
+ } else {
+ // Standard mode:
+ //
+ // <map-repetition> group <name> (MAP) {
+ // repeated group key_value {
+ // ^~~~~~~~~ repeatedGroupName
+ // required <key-type> key;
+ // <value-repetition> <value-type> value;
+ // }
+ // }
+ "key_value"
+ }
+
+ (row: SpecializedGetters, ordinal: Int) => {
+ val map = row.getMap(ordinal)
+ val keyArray = map.keyArray()
+ val valueArray = map.valueArray()
+
+ consumeGroup {
+ // Only creates the repeated field if the map is non-empty.
+ if (map.numElements() > 0) {
+ consumeField(repeatedGroupName, 0) {
+ var i = 0
+ while (i < map.numElements()) {
+ consumeGroup {
+ consumeField("key", 0) {
+ keyWriter.apply(keyArray, i)
+ }
+
+ // Only creates the "value" field if the value if non-empty
+ if (!map.valueArray().isNullAt(i)) {
+ consumeField("value", 1) {
+ valueWriter.apply(valueArray, i)
+ }
+ }
+ }
+ i += 1
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private def consumeMessage(f: => Unit): Unit = {
+ recordConsumer.startMessage()
+ f
+ recordConsumer.endMessage()
+ }
+
+ private def consumeGroup(f: => Unit): Unit = {
+ recordConsumer.startGroup()
+ f
+ recordConsumer.endGroup()
+ }
+
+ private def consumeField(field: String, index: Int)(f: => Unit): Unit = {
+ recordConsumer.startField(field, index)
+ f
+ recordConsumer.endField(field, index)
+ }
+}
+
+private[parquet] object CatalystWriteSupport {
+ val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
+
+ def setSchema(schema: StructType, configuration: Configuration): Unit = {
+ schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
+ configuration.set(SPARK_ROW_SCHEMA, schema.json)
+ configuration.set(
+ ParquetOutputFormat.WRITER_VERSION,
+ ParquetProperties.WriterVersion.PARQUET_1_0.toString)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index de1fd0166a..300e8677b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -39,7 +39,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
*
* NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
- * left * empty).
+ * left empty).
*/
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
deleted file mode 100644
index ccd7ebf319..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{MapData, ArrayData}
-
-// TODO Removes this while fixing SPARK-8848
-private[sql] object CatalystConverter {
- // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
- // Note that "array" for the array elements is chosen by ParquetAvro.
- // Using a different value will result in Parquet silently dropping columns.
- val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
- val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
-
- val MAP_KEY_SCHEMA_NAME = "key"
- val MAP_VALUE_SCHEMA_NAME = "value"
- val MAP_SCHEMA_NAME = "map"
-
- // TODO: consider using Array[T] for arrays to avoid boxing of primitive types
- type ArrayScalaType = ArrayData
- type StructScalaType = InternalRow
- type MapScalaType = MapData
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index c6b3fe7900..78040d99fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -18,24 +18,17 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.Serializable
-import java.nio.ByteBuffer
-import com.google.common.io.BaseEncoding
-import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
private[sql] object ParquetFilters {
- val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
-
case class SetInFilter[T <: Comparable[T]](
valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
@@ -282,33 +275,4 @@ private[sql] object ParquetFilters {
addMethod.setAccessible(true)
addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
}
-
- /**
- * Note: Inside the Hadoop API we only have access to `Configuration`, not to
- * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
- * the actual filter predicate.
- */
- def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
- if (filters.nonEmpty) {
- val serialized: Array[Byte] =
- SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
- val encoded: String = BaseEncoding.base64().encode(serialized)
- conf.set(PARQUET_FILTER_DATA, encoded)
- }
- }
-
- /**
- * Note: Inside the Hadoop API we only have access to `Configuration`, not to
- * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
- * the actual filter predicate.
- */
- def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = {
- val data = conf.get(PARQUET_FILTER_DATA)
- if (data != null) {
- val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
- SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
- } else {
- Seq()
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 8a9c0e733a..77d851ca48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -218,8 +218,8 @@ private[sql] class ParquetRelation(
}
// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
- val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
- if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
+ val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
+ if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[DirectParquetOutputCommitter].getCanonicalName)
}
@@ -248,18 +248,22 @@ private[sql] class ParquetRelation(
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
- // TODO There's no need to use two kinds of WriteSupport
- // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
- // complex types.
- val writeSupportClass =
- if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
- classOf[MutableRowWriteSupport]
- } else {
- classOf[RowWriteSupport]
- }
+ ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
+ CatalystWriteSupport.setSchema(dataSchema, conf)
+
+ // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
+ // and `CatalystWriteSupport` (writing actual rows to Parquet files).
+ conf.set(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ sqlContext.conf.isParquetBinaryAsString.toString)
- ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
- RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+ conf.set(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ sqlContext.conf.isParquetINT96AsTimestamp.toString)
+
+ conf.set(
+ SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+ sqlContext.conf.writeLegacyParquetFormat.toString)
// Sets compression scheme
conf.set(
@@ -287,7 +291,6 @@ private[sql] class ParquetRelation(
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
- val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
@@ -304,8 +307,7 @@ private[sql] class ParquetRelation(
useMetadataCache,
parquetFilterPushDown,
assumeBinaryIsString,
- assumeInt96IsTimestamp,
- writeLegacyParquetFormat) _
+ assumeInt96IsTimestamp) _
// Create the function to set input paths at the driver side.
val setInputPaths =
@@ -530,8 +532,7 @@ private[sql] object ParquetRelation extends Logging {
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
- assumeInt96IsTimestamp: Boolean,
- writeLegacyParquetFormat: Boolean)(job: Job): Unit = {
+ assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
@@ -552,16 +553,15 @@ private[sql] object ParquetRelation extends Logging {
})
conf.set(
- RowWriteSupport.SPARK_ROW_SCHEMA,
+ CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
- // Sets flags for Parquet schema conversion
+ // Sets flags for `CatalystSchemaConverter`
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
- conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat)
overrideMinSplitSize(parquetBlockSize, conf)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
deleted file mode 100644
index ed89aa27aa..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.math.BigInteger
-import java.nio.{ByteBuffer, ByteOrder}
-import java.util.{HashMap => JHashMap}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api._
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * A `parquet.hadoop.api.WriteSupport` for Row objects.
- */
-private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging {
-
- private[parquet] var writer: RecordConsumer = null
- private[parquet] var attributes: Array[Attribute] = null
-
- override def init(configuration: Configuration): WriteSupport.WriteContext = {
- val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
- val metadata = new JHashMap[String, String]()
- metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr)
-
- if (attributes == null) {
- attributes = ParquetTypesConverter.convertFromString(origAttributesStr).toArray
- }
-
- log.debug(s"write support initialized for requested schema $attributes")
- new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
- }
-
- override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
- writer = recordConsumer
- log.debug(s"preparing for write with schema $attributes")
- }
-
- override def write(record: InternalRow): Unit = {
- val attributesSize = attributes.size
- if (attributesSize > record.numFields) {
- throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " +
- s"($attributesSize > ${record.numFields})")
- }
-
- var index = 0
- writer.startMessage()
- while(index < attributesSize) {
- // null values indicate optional fields but we do not check currently
- if (!record.isNullAt(index)) {
- writer.startField(attributes(index).name, index)
- writeValue(attributes(index).dataType, record.get(index, attributes(index).dataType))
- writer.endField(attributes(index).name, index)
- }
- index = index + 1
- }
- writer.endMessage()
- }
-
- private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
- if (value != null) {
- schema match {
- case t: UserDefinedType[_] => writeValue(t.sqlType, value)
- case t @ ArrayType(_, _) => writeArray(
- t,
- value.asInstanceOf[CatalystConverter.ArrayScalaType])
- case t @ MapType(_, _, _) => writeMap(
- t,
- value.asInstanceOf[CatalystConverter.MapScalaType])
- case t @ StructType(_) => writeStruct(
- t,
- value.asInstanceOf[CatalystConverter.StructScalaType])
- case _ => writePrimitive(schema.asInstanceOf[AtomicType], value)
- }
- }
- }
-
- private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
- if (value != null) {
- schema match {
- case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
- case ByteType => writer.addInteger(value.asInstanceOf[Byte])
- case ShortType => writer.addInteger(value.asInstanceOf[Short])
- case IntegerType | DateType => writer.addInteger(value.asInstanceOf[Int])
- case LongType => writer.addLong(value.asInstanceOf[Long])
- case TimestampType => writeTimestamp(value.asInstanceOf[Long])
- case FloatType => writer.addFloat(value.asInstanceOf[Float])
- case DoubleType => writer.addDouble(value.asInstanceOf[Double])
- case StringType => writer.addBinary(
- Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes))
- case BinaryType => writer.addBinary(
- Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
- case DecimalType.Fixed(precision, _) =>
- writeDecimal(value.asInstanceOf[Decimal], precision)
- case _ => sys.error(s"Do not know how to writer $schema to consumer")
- }
- }
- }
-
- private[parquet] def writeStruct(
- schema: StructType,
- struct: CatalystConverter.StructScalaType): Unit = {
- if (struct != null) {
- val fields = schema.fields.toArray
- writer.startGroup()
- var i = 0
- while(i < fields.length) {
- if (!struct.isNullAt(i)) {
- writer.startField(fields(i).name, i)
- writeValue(fields(i).dataType, struct.get(i, fields(i).dataType))
- writer.endField(fields(i).name, i)
- }
- i = i + 1
- }
- writer.endGroup()
- }
- }
-
- private[parquet] def writeArray(
- schema: ArrayType,
- array: CatalystConverter.ArrayScalaType): Unit = {
- val elementType = schema.elementType
- writer.startGroup()
- if (array.numElements() > 0) {
- if (schema.containsNull) {
- writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
- var i = 0
- while (i < array.numElements()) {
- writer.startGroup()
- if (!array.isNullAt(i)) {
- writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
- writeValue(elementType, array.get(i, elementType))
- writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
- }
- writer.endGroup()
- i = i + 1
- }
- writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
- } else {
- writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
- var i = 0
- while (i < array.numElements()) {
- writeValue(elementType, array.get(i, elementType))
- i = i + 1
- }
- writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
- }
- }
- writer.endGroup()
- }
-
- private[parquet] def writeMap(
- schema: MapType,
- map: CatalystConverter.MapScalaType): Unit = {
- writer.startGroup()
- val length = map.numElements()
- if (length > 0) {
- writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
- map.foreach(schema.keyType, schema.valueType, (key, value) => {
- writer.startGroup()
- writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
- writeValue(schema.keyType, key)
- writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
- if (value != null) {
- writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
- writeValue(schema.valueType, value)
- writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
- }
- writer.endGroup()
- })
- writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
- }
- writer.endGroup()
- }
-
- // Scratch array used to write decimals as fixed-length byte array
- private[this] var reusableDecimalBytes = new Array[Byte](16)
-
- private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
- val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision)
-
- def longToBinary(unscaled: Long): Binary = {
- var i = 0
- var shift = 8 * (numBytes - 1)
- while (i < numBytes) {
- reusableDecimalBytes(i) = (unscaled >> shift).toByte
- i += 1
- shift -= 8
- }
- Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
- }
-
- def bigIntegerToBinary(unscaled: BigInteger): Binary = {
- unscaled.toByteArray match {
- case bytes if bytes.length == numBytes =>
- Binary.fromByteArray(bytes)
-
- case bytes if bytes.length <= reusableDecimalBytes.length =>
- val signedByte = (if (bytes.head < 0) -1 else 0).toByte
- java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte)
- System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length)
- Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
-
- case bytes =>
- reusableDecimalBytes = new Array[Byte](bytes.length)
- bigIntegerToBinary(unscaled)
- }
- }
-
- val binary = if (numBytes <= 8) {
- longToBinary(decimal.toUnscaledLong)
- } else {
- bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue())
- }
-
- writer.addBinary(binary)
- }
-
- // array used to write Timestamp as Int96 (fixed-length binary)
- private[this] val int96buf = new Array[Byte](12)
-
- private[parquet] def writeTimestamp(ts: Long): Unit = {
- val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
- val buf = ByteBuffer.wrap(int96buf)
- buf.order(ByteOrder.LITTLE_ENDIAN)
- buf.putLong(timeOfDayNanos)
- buf.putInt(julianDay)
- writer.addBinary(Binary.fromByteArray(int96buf))
- }
-}
-
-// Optimized for non-nested rows
-private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
- override def write(record: InternalRow): Unit = {
- val attributesSize = attributes.size
- if (attributesSize > record.numFields) {
- throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " +
- s"($attributesSize > ${record.numFields})")
- }
-
- var index = 0
- writer.startMessage()
- while(index < attributesSize) {
- // null values indicate optional fields but we do not check currently
- if (!record.isNullAt(index) && !record.isNullAt(index)) {
- writer.startField(attributes(index).name, index)
- consumeType(attributes(index).dataType, record, index)
- writer.endField(attributes(index).name, index)
- }
- index = index + 1
- }
- writer.endMessage()
- }
-
- private def consumeType(
- ctype: DataType,
- record: InternalRow,
- index: Int): Unit = {
- ctype match {
- case BooleanType => writer.addBoolean(record.getBoolean(index))
- case ByteType => writer.addInteger(record.getByte(index))
- case ShortType => writer.addInteger(record.getShort(index))
- case IntegerType | DateType => writer.addInteger(record.getInt(index))
- case LongType => writer.addLong(record.getLong(index))
- case TimestampType => writeTimestamp(record.getLong(index))
- case FloatType => writer.addFloat(record.getFloat(index))
- case DoubleType => writer.addDouble(record.getDouble(index))
- case StringType =>
- writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes))
- case BinaryType =>
- writer.addBinary(Binary.fromByteArray(record.getBinary(index)))
- case DecimalType.Fixed(precision, scale) =>
- writeDecimal(record.getDecimal(index, precision, scale), precision)
- case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer")
- }
- }
-}
-
-private[parquet] object RowWriteSupport {
- val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
-
- def getSchema(configuration: Configuration): Seq[Attribute] = {
- val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
- if (schemaString == null) {
- throw new RuntimeException("Missing schema!")
- }
- ParquetTypesConverter.convertFromString(schemaString)
- }
-
- def setSchema(schema: Seq[Attribute], configuration: Configuration) {
- val encoded = ParquetTypesConverter.convertToString(schema)
- configuration.set(SPARK_ROW_SCHEMA, encoded)
- configuration.set(
- ParquetOutputFormat.WRITER_VERSION,
- ParquetProperties.WriterVersion.PARQUET_1_0.toString)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
deleted file mode 100644
index b647bb6116..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.io.IOException
-import java.util.{Collections, Arrays}
-
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types._
-
-
-private[parquet] object ParquetTypesConverter extends Logging {
- def isPrimitiveType(ctype: DataType): Boolean = ctype match {
- case _: NumericType | BooleanType | DateType | TimestampType | StringType | BinaryType => true
- case _ => false
- }
-
- /**
- * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision.
- */
- private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { precision =>
- var length = 1
- while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) {
- length += 1
- }
- length
- }
-
- def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
- val converter = new CatalystSchemaConverter()
- converter.convert(StructType.fromAttributes(attributes))
- }
-
- def convertFromString(string: String): Seq[Attribute] = {
- Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
- case s: StructType => s.toAttributes
- case other => sys.error(s"Can convert $string to row")
- }
- }
-
- def convertToString(schema: Seq[Attribute]): String = {
- schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
- StructType.fromAttributes(schema).json
- }
-
- def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = {
- if (origPath == null) {
- throw new IllegalArgumentException("Unable to write Parquet metadata: path is null")
- }
- val fs = origPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException(
- s"Unable to write Parquet metadata: path $origPath is incorrectly formatted")
- }
- val path = origPath.makeQualified(fs)
- if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
- throw new IllegalArgumentException(s"Expected to write to directory $path but found file")
- }
- val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
- if (fs.exists(metadataPath)) {
- try {
- fs.delete(metadataPath, true)
- } catch {
- case e: IOException =>
- throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath")
- }
- }
- val extraMetadata = new java.util.HashMap[String, String]()
- extraMetadata.put(
- CatalystReadSupport.SPARK_METADATA_KEY,
- ParquetTypesConverter.convertToString(attributes))
- // TODO: add extra data, e.g., table name, date, etc.?
-
- val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes)
- val metaData: FileMetaData = new FileMetaData(
- parquetSchema,
- extraMetadata,
- "Spark")
-
- ParquetFileWriter.writeMetadataFile(
- conf,
- path,
- Arrays.asList(new Footer(path, new ParquetMetadata(metaData, Collections.emptyList()))))
- }
-
- /**
- * Try to read Parquet metadata at the given Path. We first see if there is a summary file
- * in the parent directory. If so, this is used. Else we read the actual footer at the given
- * location.
- * @param origPath The path at which we expect one (or more) Parquet files.
- * @param configuration The Hadoop configuration to use.
- * @return The `ParquetMetadata` containing among other things the schema.
- */
- def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = {
- if (origPath == null) {
- throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
- }
- val job = new Job()
- val conf = {
- // scalastyle:off jobcontext
- configuration.getOrElse(ContextUtil.getConfiguration(job))
- // scalastyle:on jobcontext
- }
- val fs: FileSystem = origPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
- }
- val path = origPath.makeQualified(fs)
-
- val children =
- fs
- .globStatus(path)
- .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) else List(status) }
- .filterNot { status =>
- val name = status.getPath.getName
- (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
- }
-
- // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
- // groups. Since Parquet schema is replicated among all row groups, we only need to touch a
- // single row group to read schema related metadata. Notice that we are making assumptions that
- // all data in a single Parquet file have the same schema, which is normally true.
- children
- // Try any non-"_metadata" file first...
- .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
- // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
- // empty, thus normally the "_metadata" file is expected to be fairly small).
- .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
- .map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER))
- .getOrElse(
- throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 7992fd59ff..d17671d48a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -24,6 +24,7 @@ import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{OpenHashSetUDT, HyperLogLogUDT}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -68,7 +69,7 @@ private[sql] class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
private[spark] override def asNullable: MyDenseVectorUDT = this
}
-class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
+class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetTest {
import testImplicits._
private lazy val pointsRDD = Seq(
@@ -98,17 +99,28 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
Seq(Row(true), Row(true)))
}
-
- test("UDTs with Parquet") {
- val tempDir = Utils.createTempDir()
- tempDir.delete()
- pointsRDD.write.parquet(tempDir.getCanonicalPath)
+ testStandardAndLegacyModes("UDTs with Parquet") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ pointsRDD.write.parquet(path)
+ checkAnswer(
+ sqlContext.read.parquet(path),
+ Seq(
+ Row(1.0, new MyDenseVector(Array(0.1, 1.0))),
+ Row(0.0, new MyDenseVector(Array(0.2, 2.0)))))
+ }
}
- test("Repartition UDTs with Parquet") {
- val tempDir = Utils.createTempDir()
- tempDir.delete()
- pointsRDD.repartition(1).write.parquet(tempDir.getCanonicalPath)
+ testStandardAndLegacyModes("Repartition UDTs with Parquet") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ pointsRDD.repartition(1).write.parquet(path)
+ checkAnswer(
+ sqlContext.read.parquet(path),
+ Seq(
+ Row(1.0, new MyDenseVector(Array(0.1, 1.0))),
+ Row(0.0, new MyDenseVector(Array(0.2, 2.0)))))
+ }
}
// Tests to make sure that all operators correctly convert types on the way out.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index cd552e8337..599cf948e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -28,10 +28,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.example.data.{Group, GroupWriter}
+import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{BlockMetaData, CompressionCodecName, FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
@@ -99,16 +99,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data))
}
- test("fixed-length decimals") {
- def makeDecimalRDD(decimal: DecimalType): DataFrame =
- sparkContext
- .parallelize(0 to 1000)
- .map(i => Tuple1(i / 100.0))
- .toDF()
- // Parquet doesn't allow column names with spaces, have to add an alias here
- .select($"_1" cast decimal as "dec")
+ testStandardAndLegacyModes("fixed-length decimals") {
+ def makeDecimalRDD(decimal: DecimalType): DataFrame = {
+ sqlContext
+ .range(1000)
+ // Parquet doesn't allow column names with spaces, have to add an alias here.
+ // Minus 500 here so that negative decimals are also tested.
+ .select((('id - 500) / 100.0) cast decimal as 'dec)
+ .coalesce(1)
+ }
- for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))) {
+ val combinations = Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))
+ for ((precision, scale) <- combinations) {
withTempPath { dir =>
val data = makeDecimalRDD(DecimalType(precision, scale))
data.write.parquet(dir.getCanonicalPath)
@@ -132,22 +134,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- test("map") {
+ testStandardAndLegacyModes("map") {
val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
checkParquetFile(data)
}
- test("array") {
+ testStandardAndLegacyModes("array") {
val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
checkParquetFile(data)
}
- test("array and double") {
+ testStandardAndLegacyModes("array and double") {
val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble)))
checkParquetFile(data)
}
- test("struct") {
+ testStandardAndLegacyModes("struct") {
val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
@@ -157,7 +159,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- test("nested struct with array of array as field") {
+ testStandardAndLegacyModes("nested struct with array of array as field") {
val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
@@ -167,7 +169,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- test("nested map with struct as value type") {
+ testStandardAndLegacyModes("nested map with struct as value type") {
val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
withParquetDataFrame(data) { df =>
checkAnswer(df, data.map { case Tuple1(m) =>
@@ -205,14 +207,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
test("compression codec") {
- def compressionCodecFor(path: String): String = {
- val codecs = ParquetTypesConverter
- .readMetaData(new Path(path), Some(hadoopConfiguration)).getBlocks.asScala
- .flatMap(_.getColumns.asScala)
- .map(_.getCodec.name())
- .distinct
-
- assert(codecs.size === 1)
+ def compressionCodecFor(path: String, codecName: String): String = {
+ val codecs = for {
+ footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConfiguration)
+ block <- footer.getParquetMetadata.getBlocks.asScala
+ column <- block.getColumns.asScala
+ } yield column.getCodec.name()
+
+ assert(codecs.distinct === Seq(codecName))
codecs.head
}
@@ -222,7 +224,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
- compressionCodecFor(path)
+ compressionCodecFor(path, codec.name())
}
}
}
@@ -278,15 +280,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { file =>
val path = new Path(file.toURI.toString)
val fs = FileSystem.getLocal(hadoopConfiguration)
- val attributes = ScalaReflection.attributesFor[(Int, String)]
- ParquetTypesConverter.writeMetaData(attributes, path, hadoopConfiguration)
+ val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)])
+ writeMetadata(schema, path, hadoopConfiguration)
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
- val metaData = ParquetTypesConverter.readMetaData(path, Some(hadoopConfiguration))
- val actualSchema = metaData.getFileMetaData.getSchema
- val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
+ val expectedSchema = new CatalystSchemaConverter().convert(schema)
+ val actualSchema = readFooter(path, hadoopConfiguration).getFileMetaData.getSchema
actualSchema.checkContains(expectedSchema)
expectedSchema.checkContains(actualSchema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 1c1cfa34ad..cc02ef81c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -484,7 +484,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
- test("SPARK-10301 requested schema clipping - UDT") {
+ testStandardAndLegacyModes("SPARK-10301 requested schema clipping - UDT") {
withTempPath { dir =>
val path = dir.getCanonicalPath
@@ -517,6 +517,50 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
Row(Row(NestedStruct(1, 2L, 3.5D))))
}
}
+
+ test("expand UDT in StructType") {
+ val schema = new StructType().add("n", new NestedStructUDT, nullable = true)
+ val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true)
+ assert(CatalystReadSupport.expandUDT(schema) === expected)
+ }
+
+ test("expand UDT in ArrayType") {
+ val schema = new StructType().add(
+ "n",
+ ArrayType(
+ elementType = new NestedStructUDT,
+ containsNull = false),
+ nullable = true)
+
+ val expected = new StructType().add(
+ "n",
+ ArrayType(
+ elementType = new NestedStructUDT().sqlType,
+ containsNull = false),
+ nullable = true)
+
+ assert(CatalystReadSupport.expandUDT(schema) === expected)
+ }
+
+ test("expand UDT in MapType") {
+ val schema = new StructType().add(
+ "n",
+ MapType(
+ keyType = IntegerType,
+ valueType = new NestedStructUDT,
+ valueContainsNull = false),
+ nullable = true)
+
+ val expected = new StructType().add(
+ "n",
+ MapType(
+ keyType = IntegerType,
+ valueType = new NestedStructUDT().sqlType,
+ valueContainsNull = false),
+ nullable = true)
+
+ assert(CatalystReadSupport.expandUDT(schema) === expected)
+ }
}
object TestingUDT {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index f17fb36f25..60fa81b1ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -357,8 +357,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}"""
// scalastyle:on
- val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString)
- val fromJson = ParquetTypesConverter.convertFromString(jsonString)
+ val fromCaseClassString = StructType.fromString(caseClassString)
+ val fromJson = StructType.fromString(jsonString)
(fromCaseClassString, fromJson).zipped.foreach { (a, b) =>
assert(a.name == b.name)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 442fafb12f..9840ad919e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -19,11 +19,19 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
+
import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLConf, SaveMode}
/**
* A helper trait that provides convenient facilities for Parquet testing.
@@ -97,4 +105,38 @@ private[sql] trait ParquetTest extends SQLTestUtils {
assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
partDir
}
+
+ protected def writeMetadata(
+ schema: StructType, path: Path, configuration: Configuration): Unit = {
+ val parquetSchema = new CatalystSchemaConverter().convert(schema)
+ val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
+ val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
+ val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy)
+ val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava)
+ val footer = new Footer(path, parquetMetadata)
+ ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava)
+ }
+
+ protected def readAllFootersWithoutSummaryFiles(
+ path: Path, configuration: Configuration): Seq[Footer] = {
+ val fs = path.getFileSystem(configuration)
+ ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq
+ }
+
+ protected def readFooter(path: Path, configuration: Configuration): ParquetMetadata = {
+ ParquetFileReader.readFooter(
+ configuration,
+ new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE),
+ ParquetMetadataConverter.NO_FILTER)
+ }
+
+ protected def testStandardAndLegacyModes(testName: String)(f: => Unit): Unit = {
+ test(s"Standard mode - $testName") {
+ withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { f }
+ }
+
+ test(s"Legacy mode - $testName") {
+ withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { f }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 107457f79e..d63f3d3996 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
+import org.apache.spark.sql.{SQLConf, QueryTest, Row, SaveMode}
class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton {
import hiveContext.implicits._
@@ -74,11 +74,13 @@ class DataSourceWithHiveMetastoreCatalogSuite
).foreach { case (provider, (inputFormat, outputFormat, serde)) =>
test(s"Persist non-partitioned $provider relation into metastore as managed table") {
withTable("t") {
- testDF
- .write
- .mode(SaveMode.Overwrite)
- .format(provider)
- .saveAsTable("t")
+ withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") {
+ testDF
+ .write
+ .mode(SaveMode.Overwrite)
+ .format(provider)
+ .saveAsTable("t")
+ }
val hiveTable = catalog.client.getTable("default", "t")
assert(hiveTable.inputFormat === Some(inputFormat))
@@ -102,12 +104,14 @@ class DataSourceWithHiveMetastoreCatalogSuite
withTable("t") {
val path = dir.getCanonicalFile
- testDF
- .write
- .mode(SaveMode.Overwrite)
- .format(provider)
- .option("path", path.toString)
- .saveAsTable("t")
+ withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") {
+ testDF
+ .write
+ .mode(SaveMode.Overwrite)
+ .format(provider)
+ .option("path", path.toString)
+ .saveAsTable("t")
+ }
val hiveTable = catalog.client.getTable("default", "t")
assert(hiveTable.inputFormat === Some(inputFormat))