aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-06-15 20:05:08 -0700
committerReynold Xin <rxin@databricks.com>2016-06-15 20:05:08 -0700
commit865e7cc38d2b7cf2d4f7e7b04ccb7b17791a693b (patch)
tree193154d715dcb186f802939a395ebd86a9b9eb0f
parent3e6d567a4688f064f2a2259c8e436b7c628a431c (diff)
downloadspark-865e7cc38d2b7cf2d4f7e7b04ccb7b17791a693b.tar.gz
spark-865e7cc38d2b7cf2d4f7e7b04ccb7b17791a693b.tar.bz2
spark-865e7cc38d2b7cf2d4f7e7b04ccb7b17791a693b.zip
[SPARK-15979][SQL] Rename various Parquet support classes.
## What changes were proposed in this pull request? This patch renames various Parquet support classes from CatalystAbc to ParquetAbc. This new naming makes more sense for two reasons: 1. These are not optimizer related (i.e. Catalyst) classes. 2. We are in the Spark code base, and as a result it'd be more clear to call out these are Parquet support classes, rather than some Spark classes. ## How was this patch tested? Renamed test cases as well. Author: Reynold Xin <rxin@databricks.com> Closes #13696 from rxin/parquet-rename.
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala6
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java8
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala)18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala)78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala)30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala4
14 files changed, 120 insertions, 123 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index b5b2a681e9..62e09d2e0c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -141,7 +141,7 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Map[String, String] = {
- def computeNumFeatures(): Int = {
+ val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse {
val dataFiles = files.filterNot(_.getPath.getName startsWith "_")
val path = if (dataFiles.length == 1) {
dataFiles.head.getPath.toUri.toString
@@ -156,10 +156,6 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
MLUtils.computeNumFeatures(parsed)
}
- val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse {
- computeNumFeatures()
- }
-
new CaseInsensitiveMap(options + ("numFeatures" -> numFeatures.toString))
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 14626e54b2..d823275d85 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -136,7 +136,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
- this.sparkSchema = new CatalystSchemaConverter(configuration).convert(requestedSchema);
+ this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema);
this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
@@ -195,12 +195,12 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
}
builder.addFields(fileSchema.getType(s));
}
- this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
+ this.requestedSchema = builder.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
} else {
- this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
+ this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
}
}
- this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
+ this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index ea37a08ab5..662a03d3b5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -259,7 +259,7 @@ public class VectorizedColumnReader {
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
- column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v));
+ column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
} else {
throw new NotImplementedException();
@@ -280,12 +280,12 @@ public class VectorizedColumnReader {
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
- column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v));
+ column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
- column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
+ column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
@@ -375,7 +375,7 @@ public class VectorizedColumnReader {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
// Read 12 bytes for INT96
- CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
+ ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
} else {
column.putNull(rowId + i);
}
@@ -394,7 +394,7 @@ public class VectorizedColumnReader {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putInt(rowId + i,
- (int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+ (int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
} else {
column.putNull(rowId + i);
}
@@ -403,7 +403,7 @@ public class VectorizedColumnReader {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
- CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+ ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
} else {
column.putNull(rowId + i);
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 9c1898994c..641c5cb02b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -207,6 +207,20 @@ trait FileFormat {
dataSchema: StructType): OutputWriterFactory
/**
+ * Returns a [[OutputWriterFactory]] for generating output writers that can write data.
+ * This method is current used only by FileStreamSinkWriter to generate output writers that
+ * does not use output committers to write data. The OutputWriter generated by the returned
+ * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
+ */
+ def buildWriter(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ options: Map[String, String]): OutputWriterFactory = {
+ // TODO: Remove this default implementation when the other formats have been ported
+ throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
+ }
+
+ /**
* Returns whether this format support returning columnar batch or not.
*
* TODO: we should just have different traits for the different formats.
@@ -293,19 +307,6 @@ trait FileFormat {
}
}
- /**
- * Returns a [[OutputWriterFactory]] for generating output writers that can write data.
- * This method is current used only by FileStreamSinkWriter to generate output writers that
- * does not use output committers to write data. The OutputWriter generated by the returned
- * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
- */
- def buildWriter(
- sqlContext: SQLContext,
- dataSchema: StructType,
- options: Map[String, String]): OutputWriterFactory = {
- // TODO: Remove this default implementation when the other formats have been ported
- throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
- }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index 67bfd39697..cf974afb26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.minBytesForPrecision
+import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -82,8 +82,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
- val messageType = new CatalystSchemaConverter(configuration).convert(schema)
- val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
+ val messageType = new ParquetSchemaConverter(configuration).convert(schema)
+ val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
logInfo(
s"""Initialized Parquet WriteSupport with Catalyst schema:
@@ -427,7 +427,7 @@ private[parquet] object CatalystWriteSupport {
val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
def setSchema(schema: StructType, configuration: Configuration): Unit = {
- schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
+ schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName)
configuration.set(SPARK_ROW_SCHEMA, schema.json)
configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6b25e36f7b..2cce3db9a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -288,13 +288,13 @@ private[sql] class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
- hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
- CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
+ ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
+ ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
@@ -369,10 +369,10 @@ private[sql] class ParquetFileFormat
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[InternalRow](
- new CatalystReadSupport,
+ new ParquetReadSupport,
FilterCompat.get(filter, null))
case _ =>
- new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+ new ParquetRecordReader[InternalRow](new ParquetReadSupport)
}
reader.initialize(split, hadoopAttemptContext)
reader
@@ -590,7 +590,7 @@ private[sql] object ParquetFileFormat extends Logging {
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
- conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+ conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
@@ -603,14 +603,14 @@ private[sql] object ParquetFileFormat extends Logging {
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}
- conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+ conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
- CatalystSchemaConverter.checkFieldNames(requestedSchema).json
+ ParquetSchemaConverter.checkFieldNames(requestedSchema).json
})
conf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
- CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+ ParquetSchemaConverter.checkFieldNames(dataSchema).json)
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
@@ -639,7 +639,7 @@ private[sql] object ParquetFileFormat extends Logging {
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
def parseParquetSchema(schema: MessageType): StructType = {
- val converter = new CatalystSchemaConverter(
+ val converter = new ParquetSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.writeLegacyParquetFormat)
@@ -653,7 +653,7 @@ private[sql] object ParquetFileFormat extends Logging {
val serializedSchema = metadata
.getKeyValueMetaData
.asScala.toMap
- .get(CatalystReadSupport.SPARK_METADATA_KEY)
+ .get(ParquetReadSupport.SPARK_METADATA_KEY)
if (serializedSchema.isEmpty) {
// Falls back to Parquet schema if no Spark SQL schema found.
Some(parseParquetSchema(metadata.getSchema))
@@ -820,7 +820,7 @@ private[sql] object ParquetFileFormat extends Logging {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
- new CatalystSchemaConverter(
+ new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -864,12 +864,12 @@ private[sql] object ParquetFileFormat extends Logging {
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
- footer: Footer, converter: CatalystSchemaConverter): StructType = {
+ footer: Footer, converter: ParquetSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
.asScala.toMap
- .get(CatalystReadSupport.SPARK_METADATA_KEY)
+ .get(ParquetReadSupport.SPARK_METADATA_KEY)
.flatMap(deserializeSchemaString)
.getOrElse(converter.convert(fileMetaData.getSchema))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 9c885b252f..e6ef634421 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
-private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
+private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _
/**
@@ -58,13 +58,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
override def init(context: InitContext): ReadContext = {
catalystRequestedSchema = {
val conf = context.getConfiguration
- val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+ val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}
val parquetRequestedSchema =
- CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
+ ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
@@ -92,13 +92,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
""".stripMargin
}
- new CatalystRecordMaterializer(
+ new ParquetRecordMaterializer(
parquetRequestedSchema,
- CatalystReadSupport.expandUDT(catalystRequestedSchema))
+ ParquetReadSupport.expandUDT(catalystRequestedSchema))
}
}
-private[parquet] object CatalystReadSupport {
+private[parquet] object ParquetReadSupport {
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
@@ -110,12 +110,12 @@ private[parquet] object CatalystReadSupport {
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
if (clippedParquetFields.isEmpty) {
- CatalystSchemaConverter.EMPTY_MESSAGE
+ ParquetSchemaConverter.EMPTY_MESSAGE
} else {
Types
.buildMessage()
.addFields(clippedParquetFields: _*)
- .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
}
@@ -269,7 +269,7 @@ private[parquet] object CatalystReadSupport {
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
- val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false)
+ val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index eeead9f5d8..0818d802b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -29,11 +29,11 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
*/
-private[parquet] class CatalystRecordMaterializer(
+private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType)
extends RecordMaterializer[InternalRow] {
- private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
+ private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater)
override def getCurrentRecord: InternalRow = rootConverter.currentRecord
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 85b0bc17ed..9dad59647e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -70,7 +70,7 @@ private[parquet] trait HasParentContainerUpdater {
/**
* A convenient converter class for Parquet group types with a [[HasParentContainerUpdater]].
*/
-private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater)
+private[parquet] abstract class ParquetGroupConverter(val updater: ParentContainerUpdater)
extends GroupConverter with HasParentContainerUpdater
/**
@@ -78,7 +78,7 @@ private[parquet] abstract class CatalystGroupConverter(val updater: ParentContai
* are handled by this converter. Parquet primitive types are only a subset of those of Spark
* SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet.
*/
-private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater)
+private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpdater)
extends PrimitiveConverter with HasParentContainerUpdater {
override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
@@ -90,7 +90,7 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
}
/**
- * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s.
+ * A [[ParquetRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s.
* Since Catalyst `StructType` is also a Parquet record, this converter can be used as root
* converter. Take the following Parquet type as an example:
* {{{
@@ -104,11 +104,11 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
* }}}
* 5 converters will be created:
*
- * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains:
- * - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
- * - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains:
- * - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and
- * - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
+ * - a root [[ParquetRowConverter]] for [[MessageType]] `root`, which contains:
+ * - a [[ParquetPrimitiveConverter]] for required [[INT_32]] field `f1`, and
+ * - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which contains:
+ * - a [[ParquetPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and
+ * - a [[ParquetStringConverter]] for optional [[UTF8]] string field `f22`
*
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
@@ -118,11 +118,11 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
* types should have been expanded.
* @param updater An updater which propagates converted field values to the parent container
*/
-private[parquet] class CatalystRowConverter(
+private[parquet] class ParquetRowConverter(
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
- extends CatalystGroupConverter(updater) with Logging {
+ extends ParquetGroupConverter(updater) with Logging {
assert(
parquetType.getFieldCount == catalystType.length,
@@ -150,7 +150,7 @@ private[parquet] class CatalystRowConverter(
""".stripMargin)
/**
- * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates
+ * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates
* converted filed values to the `ordinal`-th cell in `currentRow`.
*/
private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater {
@@ -213,33 +213,33 @@ private[parquet] class CatalystRowConverter(
catalystType match {
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
- new CatalystPrimitiveConverter(updater)
+ new ParquetPrimitiveConverter(updater)
case ByteType =>
- new CatalystPrimitiveConverter(updater) {
+ new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setByte(value.asInstanceOf[ByteType#InternalType])
}
case ShortType =>
- new CatalystPrimitiveConverter(updater) {
+ new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setShort(value.asInstanceOf[ShortType#InternalType])
}
// For INT32 backed decimals
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
- new CatalystIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+ new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
// For INT64 backed decimals
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
- new CatalystLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+ new ParquetLongDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
// For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
case t: DecimalType
if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
- new CatalystBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
+ new ParquetBinaryDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
case t: DecimalType =>
throw new RuntimeException(
@@ -248,11 +248,11 @@ private[parquet] class CatalystRowConverter(
"FIXED_LEN_BYTE_ARRAY, or BINARY.")
case StringType =>
- new CatalystStringConverter(updater)
+ new ParquetStringConverter(updater)
case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
- new CatalystPrimitiveConverter(updater) {
+ new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
assert(
@@ -268,7 +268,7 @@ private[parquet] class CatalystRowConverter(
}
case DateType =>
- new CatalystPrimitiveConverter(updater) {
+ new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
// DateType is not specialized in `SpecificMutableRow`, have to box it here.
updater.set(value.asInstanceOf[DateType#InternalType])
@@ -286,13 +286,13 @@ private[parquet] class CatalystRowConverter(
}
case t: ArrayType =>
- new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
+ new ParquetArrayConverter(parquetType.asGroupType(), t, updater)
case t: MapType =>
- new CatalystMapConverter(parquetType.asGroupType(), t, updater)
+ new ParquetMapConverter(parquetType.asGroupType(), t, updater)
case t: StructType =>
- new CatalystRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater {
+ new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})
@@ -306,8 +306,8 @@ private[parquet] class CatalystRowConverter(
/**
* Parquet converter for strings. A dictionary is used to minimize string decoding cost.
*/
- private final class CatalystStringConverter(updater: ParentContainerUpdater)
- extends CatalystPrimitiveConverter(updater) {
+ private final class ParquetStringConverter(updater: ParentContainerUpdater)
+ extends ParquetPrimitiveConverter(updater) {
private var expandedDictionary: Array[UTF8String] = null
@@ -337,9 +337,9 @@ private[parquet] class CatalystRowConverter(
/**
* Parquet converter for fixed-precision decimals.
*/
- private abstract class CatalystDecimalConverter(
+ private abstract class ParquetDecimalConverter(
precision: Int, scale: Int, updater: ParentContainerUpdater)
- extends CatalystPrimitiveConverter(updater) {
+ extends ParquetPrimitiveConverter(updater) {
protected var expandedDictionary: Array[Decimal] = _
@@ -371,7 +371,7 @@ private[parquet] class CatalystRowConverter(
protected def decimalFromBinary(value: Binary): Decimal = {
if (precision <= Decimal.MAX_LONG_DIGITS) {
// Constructs a `Decimal` with an unscaled `Long` value if possible.
- val unscaled = CatalystRowConverter.binaryToUnscaledLong(value)
+ val unscaled = ParquetRowConverter.binaryToUnscaledLong(value)
Decimal(unscaled, precision, scale)
} else {
// Otherwise, resorts to an unscaled `BigInteger` instead.
@@ -380,9 +380,9 @@ private[parquet] class CatalystRowConverter(
}
}
- private class CatalystIntDictionaryAwareDecimalConverter(
+ private class ParquetIntDictionaryAwareDecimalConverter(
precision: Int, scale: Int, updater: ParentContainerUpdater)
- extends CatalystDecimalConverter(precision, scale, updater) {
+ extends ParquetDecimalConverter(precision, scale, updater) {
override def setDictionary(dictionary: Dictionary): Unit = {
this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
@@ -391,9 +391,9 @@ private[parquet] class CatalystRowConverter(
}
}
- private class CatalystLongDictionaryAwareDecimalConverter(
+ private class ParquetLongDictionaryAwareDecimalConverter(
precision: Int, scale: Int, updater: ParentContainerUpdater)
- extends CatalystDecimalConverter(precision, scale, updater) {
+ extends ParquetDecimalConverter(precision, scale, updater) {
override def setDictionary(dictionary: Dictionary): Unit = {
this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
@@ -402,9 +402,9 @@ private[parquet] class CatalystRowConverter(
}
}
- private class CatalystBinaryDictionaryAwareDecimalConverter(
+ private class ParquetBinaryDictionaryAwareDecimalConverter(
precision: Int, scale: Int, updater: ParentContainerUpdater)
- extends CatalystDecimalConverter(precision, scale, updater) {
+ extends ParquetDecimalConverter(precision, scale, updater) {
override def setDictionary(dictionary: Dictionary): Unit = {
this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { id =>
@@ -431,11 +431,11 @@ private[parquet] class CatalystRowConverter(
*
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*/
- private final class CatalystArrayConverter(
+ private final class ParquetArrayConverter(
parquetSchema: GroupType,
catalystSchema: ArrayType,
updater: ParentContainerUpdater)
- extends CatalystGroupConverter(updater) {
+ extends ParquetGroupConverter(updater) {
private var currentArray: ArrayBuffer[Any] = _
@@ -512,11 +512,11 @@ private[parquet] class CatalystRowConverter(
}
/** Parquet converter for maps */
- private final class CatalystMapConverter(
+ private final class ParquetMapConverter(
parquetType: GroupType,
catalystType: MapType,
updater: ParentContainerUpdater)
- extends CatalystGroupConverter(updater) {
+ extends ParquetGroupConverter(updater) {
private var currentKeys: ArrayBuffer[Any] = _
private var currentValues: ArrayBuffer[Any] = _
@@ -638,7 +638,7 @@ private[parquet] class CatalystRowConverter(
}
}
-private[parquet] object CatalystRowConverter {
+private[parquet] object ParquetRowConverter {
def binaryToUnscaledLong(binary: Binary): Long = {
// The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
// we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 3688c3e2b5..bcf535d455 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -26,7 +26,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.maxPrecisionForBytes
+import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.maxPrecisionForBytes
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -52,7 +52,7 @@ import org.apache.spark.sql.types._
* When set to false, use standard format defined in parquet-format spec. This argument only
* affects Parquet write path.
*/
-private[parquet] class CatalystSchemaConverter(
+private[parquet] class ParquetSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
@@ -125,7 +125,7 @@ private[parquet] class CatalystSchemaConverter(
val precision = field.getDecimalMetadata.getPrecision
val scale = field.getDecimalMetadata.getScale
- CatalystSchemaConverter.checkConversionRequirement(
+ ParquetSchemaConverter.checkConversionRequirement(
maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
@@ -163,7 +163,7 @@ private[parquet] class CatalystSchemaConverter(
}
case INT96 =>
- CatalystSchemaConverter.checkConversionRequirement(
+ ParquetSchemaConverter.checkConversionRequirement(
assumeInt96IsTimestamp,
"INT96 is not supported unless it's interpreted as timestamp. " +
s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
@@ -206,11 +206,11 @@ private[parquet] class CatalystSchemaConverter(
//
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
case LIST =>
- CatalystSchemaConverter.checkConversionRequirement(
+ ParquetSchemaConverter.checkConversionRequirement(
field.getFieldCount == 1, s"Invalid list type $field")
val repeatedType = field.getType(0)
- CatalystSchemaConverter.checkConversionRequirement(
+ ParquetSchemaConverter.checkConversionRequirement(
repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
if (isElementType(repeatedType, field.getName)) {
@@ -226,17 +226,17 @@ private[parquet] class CatalystSchemaConverter(
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
// scalastyle:on
case MAP | MAP_KEY_VALUE =>
- CatalystSchemaConverter.checkConversionRequirement(
+ ParquetSchemaConverter.checkConversionRequirement(
field.getFieldCount == 1 && !field.getType(0).isPrimitive,
s"Invalid map type: $field")
val keyValueType = field.getType(0).asGroupType()
- CatalystSchemaConverter.checkConversionRequirement(
+ ParquetSchemaConverter.checkConversionRequirement(
keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2,
s"Invalid map type: $field")
val keyType = keyValueType.getType(0)
- CatalystSchemaConverter.checkConversionRequirement(
+ ParquetSchemaConverter.checkConversionRequirement(
keyType.isPrimitive,
s"Map key type is expected to be a primitive type, but found: $keyType")
@@ -311,7 +311,7 @@ private[parquet] class CatalystSchemaConverter(
Types
.buildMessage()
.addFields(catalystSchema.map(convertField): _*)
- .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
/**
@@ -322,7 +322,7 @@ private[parquet] class CatalystSchemaConverter(
}
private def convertField(field: StructField, repetition: Type.Repetition): Type = {
- CatalystSchemaConverter.checkFieldName(field.name)
+ ParquetSchemaConverter.checkFieldName(field.name)
field.dataType match {
// ===================
@@ -394,7 +394,7 @@ private[parquet] class CatalystSchemaConverter(
.as(DECIMAL)
.precision(precision)
.scale(scale)
- .length(CatalystSchemaConverter.minBytesForPrecision(precision))
+ .length(ParquetSchemaConverter.minBytesForPrecision(precision))
.named(field.name)
// ========================
@@ -428,7 +428,7 @@ private[parquet] class CatalystSchemaConverter(
.as(DECIMAL)
.precision(precision)
.scale(scale)
- .length(CatalystSchemaConverter.minBytesForPrecision(precision))
+ .length(ParquetSchemaConverter.minBytesForPrecision(precision))
.named(field.name)
// ===================================
@@ -535,7 +535,7 @@ private[parquet] class CatalystSchemaConverter(
}
}
-private[parquet] object CatalystSchemaConverter {
+private[parquet] object ParquetSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
// !! HACK ALERT !!
@@ -551,7 +551,7 @@ private[parquet] object CatalystSchemaConverter {
val EMPTY_MESSAGE = Types
.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
- .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
EMPTY_MESSAGE.getFields.clear()
def checkFieldName(name: String): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 92f2db325c..fc9ce6bb30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -362,7 +362,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
- val expectedSchema = new CatalystSchemaConverter().convert(schema)
+ val expectedSchema = new ParquetSchemaConverter().convert(schema)
val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
actualSchema.checkContains(expectedSchema)
@@ -432,7 +432,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
""".stripMargin)
withTempPath { location =>
- val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
+ val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
val path = new Path(location.getCanonicalPath)
val conf = spark.sessionState.newHadoopConf()
writeMetadata(parquetSchema, path, conf, extraMetadata)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index b4fd0ef6ed..83d10010f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -574,7 +574,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
test("expand UDT in StructType") {
val schema = new StructType().add("n", new NestedStructUDT, nullable = true)
val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true)
- assert(CatalystReadSupport.expandUDT(schema) === expected)
+ assert(ParquetReadSupport.expandUDT(schema) === expected)
}
test("expand UDT in ArrayType") {
@@ -592,7 +592,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
containsNull = false),
nullable = true)
- assert(CatalystReadSupport.expandUDT(schema) === expected)
+ assert(ParquetReadSupport.expandUDT(schema) === expected)
}
test("expand UDT in MapType") {
@@ -612,7 +612,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
valueContainsNull = false),
nullable = true)
- assert(CatalystReadSupport.expandUDT(schema) === expected)
+ assert(ParquetReadSupport.expandUDT(schema) === expected)
}
test("returning batch for wide table") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 865bfa24c5..8a980a7eb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -54,7 +54,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean): Unit = {
- val converter = new CatalystSchemaConverter(
+ val converter = new ParquetSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -78,7 +78,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean): Unit = {
- val converter = new CatalystSchemaConverter(
+ val converter = new ParquetSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)
@@ -1062,7 +1062,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema: StructType,
expectedSchema: MessageType): Unit = {
test(s"Clipping - $testName") {
- val actual = CatalystReadSupport.clipParquetSchema(
+ val actual = ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
try {
@@ -1424,7 +1424,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema = new StructType(),
- expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE)
+ expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE)
testSchemaClipping(
"disjoint field sets",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 1953d6fa5a..9fb34e03cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -124,8 +124,8 @@ private[sql] trait ParquetTest extends SQLTestUtils {
protected def writeMetadata(
schema: StructType, path: Path, configuration: Configuration): Unit = {
- val parquetSchema = new CatalystSchemaConverter().convert(schema)
- val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
+ val parquetSchema = new ParquetSchemaConverter().convert(schema)
+ val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy)
val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava)