diff options
14 files changed, 120 insertions, 123 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index b5b2a681e9..62e09d2e0c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -141,7 +141,7 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Map[String, String] = { - def computeNumFeatures(): Int = { + val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse { val dataFiles = files.filterNot(_.getPath.getName startsWith "_") val path = if (dataFiles.length == 1) { dataFiles.head.getPath.toUri.toString @@ -156,10 +156,6 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { MLUtils.computeNumFeatures(parsed) } - val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse { - computeNumFeatures() - } - new CaseInsensitiveMap(options + ("numFeatures" -> numFeatures.toString)) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 14626e54b2..d823275d85 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -136,7 +136,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ReadSupport.ReadContext readContext = readSupport.init(new InitContext( taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); this.requestedSchema = readContext.getRequestedSchema(); - this.sparkSchema = new CatalystSchemaConverter(configuration).convert(requestedSchema); + this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema); this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns()); for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); @@ -195,12 +195,12 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo } builder.addFields(fileSchema.getType(s)); } - this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME()); + this.requestedSchema = builder.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME()); } else { - this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE(); + this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE(); } } - this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema); + this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema); this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns()); for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index ea37a08ab5..662a03d3b5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -259,7 +259,7 @@ public class VectorizedColumnReader { for (int i = rowId; i < rowId + num; ++i) { // TODO: Convert dictionary of Binaries to dictionary of Longs Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); } } else { throw new NotImplementedException(); @@ -280,12 +280,12 @@ public class VectorizedColumnReader { if (DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v)); + column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v)); + column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { @@ -375,7 +375,7 @@ public class VectorizedColumnReader { if (defColumn.readInteger() == maxDefLevel) { column.putLong(rowId + i, // Read 12 bytes for INT96 - CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12))); + ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12))); } else { column.putNull(rowId + i); } @@ -394,7 +394,7 @@ public class VectorizedColumnReader { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { column.putInt(rowId + i, - (int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); + (int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); } else { column.putNull(rowId + i); } @@ -403,7 +403,7 @@ public class VectorizedColumnReader { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { column.putLong(rowId + i, - CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); + ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 9c1898994c..641c5cb02b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -207,6 +207,20 @@ trait FileFormat { dataSchema: StructType): OutputWriterFactory /** + * Returns a [[OutputWriterFactory]] for generating output writers that can write data. + * This method is current used only by FileStreamSinkWriter to generate output writers that + * does not use output committers to write data. The OutputWriter generated by the returned + * [[OutputWriterFactory]] must implement the method `newWriter(path)`.. + */ + def buildWriter( + sqlContext: SQLContext, + dataSchema: StructType, + options: Map[String, String]): OutputWriterFactory = { + // TODO: Remove this default implementation when the other formats have been ported + throw new UnsupportedOperationException(s"buildWriter is not supported for $this") + } + + /** * Returns whether this format support returning columnar batch or not. * * TODO: we should just have different traits for the different formats. @@ -293,19 +307,6 @@ trait FileFormat { } } - /** - * Returns a [[OutputWriterFactory]] for generating output writers that can write data. - * This method is current used only by FileStreamSinkWriter to generate output writers that - * does not use output committers to write data. The OutputWriter generated by the returned - * [[OutputWriterFactory]] must implement the method `newWriter(path)`.. - */ - def buildWriter( - sqlContext: SQLContext, - dataSchema: StructType, - options: Map[String, String]): OutputWriterFactory = { - // TODO: Remove this default implementation when the other formats have been ported - throw new UnsupportedOperationException(s"buildWriter is not supported for $this") - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 67bfd39697..cf974afb26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.minBytesForPrecision +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -82,8 +82,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi } this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) - val messageType = new CatalystSchemaConverter(configuration).convert(schema) - val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava + val messageType = new ParquetSchemaConverter(configuration).convert(schema) + val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava logInfo( s"""Initialized Parquet WriteSupport with Catalyst schema: @@ -427,7 +427,7 @@ private[parquet] object CatalystWriteSupport { val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" def setSchema(schema: StructType, configuration: Configuration): Unit = { - schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) + schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 6b25e36f7b..2cce3db9a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -288,13 +288,13 @@ private[sql] class ParquetFileFormat filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( - CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - CatalystSchemaConverter.checkFieldNames(requiredSchema).json) + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + ParquetSchemaConverter.checkFieldNames(requiredSchema).json) hadoopConf.set( CatalystWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(requiredSchema).json) + ParquetSchemaConverter.checkFieldNames(requiredSchema).json) // We want to clear this temporary metadata from saving into Parquet file. // This metadata is only useful for detecting optional columns when pushdowning filters. @@ -369,10 +369,10 @@ private[sql] class ParquetFileFormat val reader = pushed match { case Some(filter) => new ParquetRecordReader[InternalRow]( - new CatalystReadSupport, + new ParquetReadSupport, FilterCompat.get(filter, null)) case _ => - new ParquetRecordReader[InternalRow](new CatalystReadSupport) + new ParquetRecordReader[InternalRow](new ParquetReadSupport) } reader.initialize(split, hadoopAttemptContext) reader @@ -590,7 +590,7 @@ private[sql] object ParquetFileFormat extends Logging { assumeBinaryIsString: Boolean, assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { val conf = job.getConfiguration - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) // Try to push down filters when filter push-down is enabled. if (parquetFilterPushDown) { @@ -603,14 +603,14 @@ private[sql] object ParquetFileFormat extends Logging { .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) } - conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { + conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - CatalystSchemaConverter.checkFieldNames(requestedSchema).json + ParquetSchemaConverter.checkFieldNames(requestedSchema).json }) conf.set( CatalystWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) + ParquetSchemaConverter.checkFieldNames(dataSchema).json) // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) @@ -639,7 +639,7 @@ private[sql] object ParquetFileFormat extends Logging { footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { def parseParquetSchema(schema: MessageType): StructType = { - val converter = new CatalystSchemaConverter( + val converter = new ParquetSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.writeLegacyParquetFormat) @@ -653,7 +653,7 @@ private[sql] object ParquetFileFormat extends Logging { val serializedSchema = metadata .getKeyValueMetaData .asScala.toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) + .get(ParquetReadSupport.SPARK_METADATA_KEY) if (serializedSchema.isEmpty) { // Falls back to Parquet schema if no Spark SQL schema found. Some(parseParquetSchema(metadata.getSchema)) @@ -820,7 +820,7 @@ private[sql] object ParquetFileFormat extends Logging { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = - new CatalystSchemaConverter( + new ParquetSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, writeLegacyParquetFormat = writeLegacyParquetFormat) @@ -864,12 +864,12 @@ private[sql] object ParquetFileFormat extends Logging { * a [[StructType]] converted from the [[MessageType]] stored in this footer. */ def readSchemaFromFooter( - footer: Footer, converter: CatalystSchemaConverter): StructType = { + footer: Footer, converter: ParquetSchemaConverter): StructType = { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData .asScala.toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) + .get(ParquetReadSupport.SPARK_METADATA_KEY) .flatMap(deserializeSchemaString) .getOrElse(converter.convert(fileMetaData.getSchema)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 9c885b252f..e6ef634421 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { +private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging { private var catalystRequestedSchema: StructType = _ /** @@ -58,13 +58,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with override def init(context: InitContext): ReadContext = { catalystRequestedSchema = { val conf = context.getConfiguration - val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) + val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) } val parquetRequestedSchema = - CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) + ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) } @@ -92,13 +92,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with """.stripMargin } - new CatalystRecordMaterializer( + new ParquetRecordMaterializer( parquetRequestedSchema, - CatalystReadSupport.expandUDT(catalystRequestedSchema)) + ParquetReadSupport.expandUDT(catalystRequestedSchema)) } } -private[parquet] object CatalystReadSupport { +private[parquet] object ParquetReadSupport { val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" @@ -110,12 +110,12 @@ private[parquet] object CatalystReadSupport { def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) if (clippedParquetFields.isEmpty) { - CatalystSchemaConverter.EMPTY_MESSAGE + ParquetSchemaConverter.EMPTY_MESSAGE } else { Types .buildMessage() .addFields(clippedParquetFields: _*) - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) } } @@ -269,7 +269,7 @@ private[parquet] object CatalystReadSupport { private def clipParquetGroupFields( parquetRecord: GroupType, structType: StructType): Seq[Type] = { val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap - val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false) + val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false) structType.map { f => parquetFieldMap .get(f.name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index eeead9f5d8..0818d802b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -29,11 +29,11 @@ import org.apache.spark.sql.types.StructType * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed */ -private[parquet] class CatalystRecordMaterializer( +private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType) extends RecordMaterializer[InternalRow] { - private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater) + private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 85b0bc17ed..9dad59647e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -70,7 +70,7 @@ private[parquet] trait HasParentContainerUpdater { /** * A convenient converter class for Parquet group types with a [[HasParentContainerUpdater]]. */ -private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater) +private[parquet] abstract class ParquetGroupConverter(val updater: ParentContainerUpdater) extends GroupConverter with HasParentContainerUpdater /** @@ -78,7 +78,7 @@ private[parquet] abstract class CatalystGroupConverter(val updater: ParentContai * are handled by this converter. Parquet primitive types are only a subset of those of Spark * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. */ -private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater) +private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpdater) extends PrimitiveConverter with HasParentContainerUpdater { override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) @@ -90,7 +90,7 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp } /** - * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s. + * A [[ParquetRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s. * Since Catalyst `StructType` is also a Parquet record, this converter can be used as root * converter. Take the following Parquet type as an example: * {{{ @@ -104,11 +104,11 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp * }}} * 5 converters will be created: * - * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains: - * - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and - * - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains: - * - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and - * - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22` + * - a root [[ParquetRowConverter]] for [[MessageType]] `root`, which contains: + * - a [[ParquetPrimitiveConverter]] for required [[INT_32]] field `f1`, and + * - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which contains: + * - a [[ParquetPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and + * - a [[ParquetStringConverter]] for optional [[UTF8]] string field `f22` * * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. @@ -118,11 +118,11 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp * types should have been expanded. * @param updater An updater which propagates converted field values to the parent container */ -private[parquet] class CatalystRowConverter( +private[parquet] class ParquetRowConverter( parquetType: GroupType, catalystType: StructType, updater: ParentContainerUpdater) - extends CatalystGroupConverter(updater) with Logging { + extends ParquetGroupConverter(updater) with Logging { assert( parquetType.getFieldCount == catalystType.length, @@ -150,7 +150,7 @@ private[parquet] class CatalystRowConverter( """.stripMargin) /** - * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates + * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates * converted filed values to the `ordinal`-th cell in `currentRow`. */ private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { @@ -213,33 +213,33 @@ private[parquet] class CatalystRowConverter( catalystType match { case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => - new CatalystPrimitiveConverter(updater) + new ParquetPrimitiveConverter(updater) case ByteType => - new CatalystPrimitiveConverter(updater) { + new ParquetPrimitiveConverter(updater) { override def addInt(value: Int): Unit = updater.setByte(value.asInstanceOf[ByteType#InternalType]) } case ShortType => - new CatalystPrimitiveConverter(updater) { + new ParquetPrimitiveConverter(updater) { override def addInt(value: Int): Unit = updater.setShort(value.asInstanceOf[ShortType#InternalType]) } // For INT32 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => - new CatalystIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater) + new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater) // For INT64 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => - new CatalystLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater) + new ParquetLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater) // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY || parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY => - new CatalystBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater) + new ParquetBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater) case t: DecimalType => throw new RuntimeException( @@ -248,11 +248,11 @@ private[parquet] class CatalystRowConverter( "FIXED_LEN_BYTE_ARRAY, or BINARY.") case StringType => - new CatalystStringConverter(updater) + new ParquetStringConverter(updater) case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. - new CatalystPrimitiveConverter(updater) { + new ParquetPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { assert( @@ -268,7 +268,7 @@ private[parquet] class CatalystRowConverter( } case DateType => - new CatalystPrimitiveConverter(updater) { + new ParquetPrimitiveConverter(updater) { override def addInt(value: Int): Unit = { // DateType is not specialized in `SpecificMutableRow`, have to box it here. updater.set(value.asInstanceOf[DateType#InternalType]) @@ -286,13 +286,13 @@ private[parquet] class CatalystRowConverter( } case t: ArrayType => - new CatalystArrayConverter(parquetType.asGroupType(), t, updater) + new ParquetArrayConverter(parquetType.asGroupType(), t, updater) case t: MapType => - new CatalystMapConverter(parquetType.asGroupType(), t, updater) + new ParquetMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => - new CatalystRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { + new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) @@ -306,8 +306,8 @@ private[parquet] class CatalystRowConverter( /** * Parquet converter for strings. A dictionary is used to minimize string decoding cost. */ - private final class CatalystStringConverter(updater: ParentContainerUpdater) - extends CatalystPrimitiveConverter(updater) { + private final class ParquetStringConverter(updater: ParentContainerUpdater) + extends ParquetPrimitiveConverter(updater) { private var expandedDictionary: Array[UTF8String] = null @@ -337,9 +337,9 @@ private[parquet] class CatalystRowConverter( /** * Parquet converter for fixed-precision decimals. */ - private abstract class CatalystDecimalConverter( + private abstract class ParquetDecimalConverter( precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystPrimitiveConverter(updater) { + extends ParquetPrimitiveConverter(updater) { protected var expandedDictionary: Array[Decimal] = _ @@ -371,7 +371,7 @@ private[parquet] class CatalystRowConverter( protected def decimalFromBinary(value: Binary): Decimal = { if (precision <= Decimal.MAX_LONG_DIGITS) { // Constructs a `Decimal` with an unscaled `Long` value if possible. - val unscaled = CatalystRowConverter.binaryToUnscaledLong(value) + val unscaled = ParquetRowConverter.binaryToUnscaledLong(value) Decimal(unscaled, precision, scale) } else { // Otherwise, resorts to an unscaled `BigInteger` instead. @@ -380,9 +380,9 @@ private[parquet] class CatalystRowConverter( } } - private class CatalystIntDictionaryAwareDecimalConverter( + private class ParquetIntDictionaryAwareDecimalConverter( precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystDecimalConverter(precision, scale, updater) { + extends ParquetDecimalConverter(precision, scale, updater) { override def setDictionary(dictionary: Dictionary): Unit = { this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => @@ -391,9 +391,9 @@ private[parquet] class CatalystRowConverter( } } - private class CatalystLongDictionaryAwareDecimalConverter( + private class ParquetLongDictionaryAwareDecimalConverter( precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystDecimalConverter(precision, scale, updater) { + extends ParquetDecimalConverter(precision, scale, updater) { override def setDictionary(dictionary: Dictionary): Unit = { this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => @@ -402,9 +402,9 @@ private[parquet] class CatalystRowConverter( } } - private class CatalystBinaryDictionaryAwareDecimalConverter( + private class ParquetBinaryDictionaryAwareDecimalConverter( precision: Int, scale: Int, updater: ParentContainerUpdater) - extends CatalystDecimalConverter(precision, scale, updater) { + extends ParquetDecimalConverter(precision, scale, updater) { override def setDictionary(dictionary: Dictionary): Unit = { this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id => @@ -431,11 +431,11 @@ private[parquet] class CatalystRowConverter( * * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists */ - private final class CatalystArrayConverter( + private final class ParquetArrayConverter( parquetSchema: GroupType, catalystSchema: ArrayType, updater: ParentContainerUpdater) - extends CatalystGroupConverter(updater) { + extends ParquetGroupConverter(updater) { private var currentArray: ArrayBuffer[Any] = _ @@ -512,11 +512,11 @@ private[parquet] class CatalystRowConverter( } /** Parquet converter for maps */ - private final class CatalystMapConverter( + private final class ParquetMapConverter( parquetType: GroupType, catalystType: MapType, updater: ParentContainerUpdater) - extends CatalystGroupConverter(updater) { + extends ParquetGroupConverter(updater) { private var currentKeys: ArrayBuffer[Any] = _ private var currentValues: ArrayBuffer[Any] = _ @@ -638,7 +638,7 @@ private[parquet] class CatalystRowConverter( } } -private[parquet] object CatalystRowConverter { +private[parquet] object ParquetRowConverter { def binaryToUnscaledLong(binary: Binary): Long = { // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 3688c3e2b5..bcf535d455 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -26,7 +26,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition._ import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.maxPrecisionForBytes +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.maxPrecisionForBytes import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -52,7 +52,7 @@ import org.apache.spark.sql.types._ * When set to false, use standard format defined in parquet-format spec. This argument only * affects Parquet write path. */ -private[parquet] class CatalystSchemaConverter( +private[parquet] class ParquetSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) { @@ -125,7 +125,7 @@ private[parquet] class CatalystSchemaConverter( val precision = field.getDecimalMetadata.getPrecision val scale = field.getDecimalMetadata.getScale - CatalystSchemaConverter.checkConversionRequirement( + ParquetSchemaConverter.checkConversionRequirement( maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)") @@ -163,7 +163,7 @@ private[parquet] class CatalystSchemaConverter( } case INT96 => - CatalystSchemaConverter.checkConversionRequirement( + ParquetSchemaConverter.checkConversionRequirement( assumeInt96IsTimestamp, "INT96 is not supported unless it's interpreted as timestamp. " + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") @@ -206,11 +206,11 @@ private[parquet] class CatalystSchemaConverter( // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists case LIST => - CatalystSchemaConverter.checkConversionRequirement( + ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1, s"Invalid list type $field") val repeatedType = field.getType(0) - CatalystSchemaConverter.checkConversionRequirement( + ParquetSchemaConverter.checkConversionRequirement( repeatedType.isRepetition(REPEATED), s"Invalid list type $field") if (isElementType(repeatedType, field.getName)) { @@ -226,17 +226,17 @@ private[parquet] class CatalystSchemaConverter( // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 // scalastyle:on case MAP | MAP_KEY_VALUE => - CatalystSchemaConverter.checkConversionRequirement( + ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1 && !field.getType(0).isPrimitive, s"Invalid map type: $field") val keyValueType = field.getType(0).asGroupType() - CatalystSchemaConverter.checkConversionRequirement( + ParquetSchemaConverter.checkConversionRequirement( keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, s"Invalid map type: $field") val keyType = keyValueType.getType(0) - CatalystSchemaConverter.checkConversionRequirement( + ParquetSchemaConverter.checkConversionRequirement( keyType.isPrimitive, s"Map key type is expected to be a primitive type, but found: $keyType") @@ -311,7 +311,7 @@ private[parquet] class CatalystSchemaConverter( Types .buildMessage() .addFields(catalystSchema.map(convertField): _*) - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) } /** @@ -322,7 +322,7 @@ private[parquet] class CatalystSchemaConverter( } private def convertField(field: StructField, repetition: Type.Repetition): Type = { - CatalystSchemaConverter.checkFieldName(field.name) + ParquetSchemaConverter.checkFieldName(field.name) field.dataType match { // =================== @@ -394,7 +394,7 @@ private[parquet] class CatalystSchemaConverter( .as(DECIMAL) .precision(precision) .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) + .length(ParquetSchemaConverter.minBytesForPrecision(precision)) .named(field.name) // ======================== @@ -428,7 +428,7 @@ private[parquet] class CatalystSchemaConverter( .as(DECIMAL) .precision(precision) .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) + .length(ParquetSchemaConverter.minBytesForPrecision(precision)) .named(field.name) // =================================== @@ -535,7 +535,7 @@ private[parquet] class CatalystSchemaConverter( } } -private[parquet] object CatalystSchemaConverter { +private[parquet] object ParquetSchemaConverter { val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" // !! HACK ALERT !! @@ -551,7 +551,7 @@ private[parquet] object CatalystSchemaConverter { val EMPTY_MESSAGE = Types .buildMessage() .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy") - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) EMPTY_MESSAGE.getFields.clear() def checkFieldName(name: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 92f2db325c..fc9ce6bb30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -362,7 +362,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val expectedSchema = new CatalystSchemaConverter().convert(schema) + val expectedSchema = new ParquetSchemaConverter().convert(schema) val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema actualSchema.checkContains(expectedSchema) @@ -432,7 +432,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) + val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) val path = new Path(location.getCanonicalPath) val conf = spark.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf, extraMetadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index b4fd0ef6ed..83d10010f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -574,7 +574,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("expand UDT in StructType") { val schema = new StructType().add("n", new NestedStructUDT, nullable = true) val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true) - assert(CatalystReadSupport.expandUDT(schema) === expected) + assert(ParquetReadSupport.expandUDT(schema) === expected) } test("expand UDT in ArrayType") { @@ -592,7 +592,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext containsNull = false), nullable = true) - assert(CatalystReadSupport.expandUDT(schema) === expected) + assert(ParquetReadSupport.expandUDT(schema) === expected) } test("expand UDT in MapType") { @@ -612,7 +612,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext valueContainsNull = false), nullable = true) - assert(CatalystReadSupport.expandUDT(schema) === expected) + assert(ParquetReadSupport.expandUDT(schema) === expected) } test("returning batch for wide table") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 865bfa24c5..8a980a7eb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -54,7 +54,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean): Unit = { - val converter = new CatalystSchemaConverter( + val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, writeLegacyParquetFormat = writeLegacyParquetFormat) @@ -78,7 +78,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean): Unit = { - val converter = new CatalystSchemaConverter( + val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, writeLegacyParquetFormat = writeLegacyParquetFormat) @@ -1062,7 +1062,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema: StructType, expectedSchema: MessageType): Unit = { test(s"Clipping - $testName") { - val actual = CatalystReadSupport.clipParquetSchema( + val actual = ParquetReadSupport.clipParquetSchema( MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) try { @@ -1424,7 +1424,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema = new StructType(), - expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE) + expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE) testSchemaClipping( "disjoint field sets", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 1953d6fa5a..9fb34e03cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -124,8 +124,8 @@ private[sql] trait ParquetTest extends SQLTestUtils { protected def writeMetadata( schema: StructType, path: Path, configuration: Configuration): Unit = { - val parquetSchema = new CatalystSchemaConverter().convert(schema) - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schema.json).asJava + val parquetSchema = new ParquetSchemaConverter().convert(schema) + val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}" val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy) val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava) |