aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-26 16:49:19 -0700
committerReynold Xin <rxin@databricks.com>2015-07-26 16:49:19 -0700
commitc025c3d0a1fdfbc45b64db9c871176b40b4a7b9b (patch)
tree71d106872ef4a948f187433ad4cd291427949745
parent6b2baec04fa3d928f0ee84af8c2723ac03a4648c (diff)
downloadspark-c025c3d0a1fdfbc45b64db9c871176b40b4a7b9b.tar.gz
spark-c025c3d0a1fdfbc45b64db9c871176b40b4a7b9b.tar.bz2
spark-c025c3d0a1fdfbc45b64db9c871176b40b4a7b9b.zip
[SPARK-9095] [SQL] Removes the old Parquet support
This PR removes the old Parquet support: - Removes the old `ParquetRelation` together with related SQL configuration, plan nodes, strategies, utility classes, and test suites. - Renames `ParquetRelation2` to `ParquetRelation` - Renames `RowReadSupport` and `RowRecordMaterializer` to `CatalystReadSupport` and `CatalystRecordMaterializer` respectively, and moved them to separate files. This follows naming convention used in other Parquet data models implemented in parquet-mr. It should be easier for developers who are familiar with Parquet to follow. There's still some other code that can be cleaned up. Especially `RowWriteSupport`. But I'd like to leave this part to SPARK-8848. Author: Cheng Lian <lian@databricks.com> Closes #7441 from liancheng/spark-9095 and squashes the following commits: c7b6e38 [Cheng Lian] Removes WriteToFile 2d688d6 [Cheng Lian] Renames ParquetRelation2 to ParquetRelation ca9e1b7 [Cheng Lian] Removes old Parquet support
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala58
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala153
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala843
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala492
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala151
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala732
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala65
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala27
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala141
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala86
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala54
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala174
27 files changed, 1037 insertions, 2152 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 8e1a236e29..af68358daf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -186,12 +186,6 @@ case class WithWindowDefinition(
override def output: Seq[Attribute] = child.output
}
-case class WriteToFile(
- path: String,
- child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
-
/**
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index fa942a1f8f..114ab91d10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -139,8 +139,7 @@ class DataFrame private[sql](
// happen right away to let these side effects take place eagerly.
case _: Command |
_: InsertIntoTable |
- _: CreateTableUsingAsSelect |
- _: WriteToFile =>
+ _: CreateTableUsingAsSelect =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
queryExecution.analyzed
@@ -1615,11 +1614,7 @@ class DataFrame private[sql](
*/
@deprecated("Use write.parquet(path)", "1.4.0")
def saveAsParquetFile(path: String): Unit = {
- if (sqlContext.conf.parquetUseDataSourceApi) {
- write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
- } else {
- sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
- }
+ write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e9d782cdcd..eb09807f9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,16 +21,16 @@ import java.util.Properties
import org.apache.hadoop.fs.Path
-import org.apache.spark.{Logging, Partition}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.JSONRelation
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.StructType
+import org.apache.spark.{Logging, Partition}
/**
* :: Experimental ::
@@ -259,7 +259,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}.toArray
sqlContext.baseRelationToDataFrame(
- new ParquetRelation2(
+ new ParquetRelation(
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2a641b9d64..9b2dbd7442 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -276,10 +276,6 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "Enables Parquet filter push-down optimization when set to true.")
- val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi",
- defaultValue = Some(true),
- doc = "<TODO>")
-
val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
key = "spark.sql.parquet.followParquetFormatSpec",
defaultValue = Some(false),
@@ -456,8 +452,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
- private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API)
-
private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 49bfe74b68..0e25e06e99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -870,7 +870,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
LeftSemiJoin ::
HashJoin ::
InMemoryScans ::
- ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)
@@ -1115,11 +1114,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
def parquetFile(paths: String*): DataFrame = {
if (paths.isEmpty) {
emptyDataFrame
- } else if (conf.parquetUseDataSourceApi) {
- read.parquet(paths : _*)
} else {
- DataFrame(this, parquet.ParquetRelation(
- paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
+ read.parquet(paths : _*)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index eb4be1900b..e2c7e8006f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,19 +17,18 @@
package org.apache.spark.sql.execution
-import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
-import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
-import org.apache.spark.sql.parquet._
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{SQLContext, Strategy, execution}
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
@@ -306,57 +305,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
- object ParquetOperations extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- // TODO: need to support writing to other types of files. Unify the below code paths.
- case logical.WriteToFile(path, child) =>
- val relation =
- ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
- // Note: overwrite=false because otherwise the metadata we just created will be deleted
- InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil
- case logical.InsertIntoTable(
- table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
- InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
- case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
- val partitionColNames = relation.partitioningAttributes.map(_.name).toSet
- val filtersToPush = filters.filter { pred =>
- val referencedColNames = pred.references.map(_.name).toSet
- referencedColNames.intersect(partitionColNames).isEmpty
- }
- val prunePushedDownFilters =
- if (sqlContext.conf.parquetFilterPushDown) {
- (predicates: Seq[Expression]) => {
- // Note: filters cannot be pushed down to Parquet if they contain more complex
- // expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
- // filters that have been pushed down. Note that a predicate such as "(A AND B) OR C"
- // can result in "A OR C" being pushed down. Here we are conservative in the sense
- // that even if "A" was pushed and we check for "A AND B" we still want to keep
- // "A AND B" in the higher-level filter, not just "B".
- predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
- case (predicate, None) => predicate
- // Filter needs to be applied above when it contains partitioning
- // columns
- case (predicate, _)
- if !predicate.references.map(_.name).toSet.intersect(partitionColNames).isEmpty =>
- predicate
- }
- }
- } else {
- identity[Seq[Expression]] _
- }
- pruneFilterProject(
- projectList,
- filters,
- prunePushedDownFilters,
- ParquetTableScan(
- _,
- relation,
- if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil
-
- case _ => Nil
- }
- }
-
object InMemoryScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
new file mode 100644
index 0000000000..975fec101d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
+ override def prepareForRead(
+ conf: Configuration,
+ keyValueMetaData: JMap[String, String],
+ fileSchema: MessageType,
+ readContext: ReadContext): RecordMaterializer[InternalRow] = {
+ log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
+
+ val toCatalyst = new CatalystSchemaConverter(conf)
+ val parquetRequestedSchema = readContext.getRequestedSchema
+
+ val catalystRequestedSchema =
+ Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
+ metadata
+ // First tries to read requested schema, which may result from projections
+ .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+ // If not available, tries to read Catalyst schema from file metadata. It's only
+ // available if the target file is written by Spark SQL.
+ .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
+ }.map(StructType.fromString).getOrElse {
+ logDebug("Catalyst schema not available, falling back to Parquet schema")
+ toCatalyst.convert(parquetRequestedSchema)
+ }
+
+ logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
+ new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
+ }
+
+ override def init(context: InitContext): ReadContext = {
+ val conf = context.getConfiguration
+
+ // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
+ // schema of this file from its the metadata.
+ val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
+
+ // Optional schema of requested columns, in the form of a string serialized from a Catalyst
+ // `StructType` containing all requested columns.
+ val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
+
+ // Below we construct a Parquet schema containing all requested columns. This schema tells
+ // Parquet which columns to read.
+ //
+ // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
+ // we have to fallback to the full file schema which contains all columns in the file.
+ // Obviously this may waste IO bandwidth since it may read more columns than requested.
+ //
+ // Two things to note:
+ //
+ // 1. It's possible that some requested columns don't exist in the target Parquet file. For
+ // example, in the case of schema merging, the globally merged schema may contain extra
+ // columns gathered from other Parquet files. These columns will be simply filled with nulls
+ // when actually reading the target Parquet file.
+ //
+ // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
+ // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
+ // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
+ // containing a single integer array field `f1` may have the following legacy 2-level
+ // structure:
+ //
+ // message root {
+ // optional group f1 (LIST) {
+ // required INT32 element;
+ // }
+ // }
+ //
+ // while `CatalystSchemaConverter` may generate a standard 3-level structure:
+ //
+ // message root {
+ // optional group f1 (LIST) {
+ // repeated group list {
+ // required INT32 element;
+ // }
+ // }
+ // }
+ //
+ // Apparently, we can't use the 2nd schema to read the target Parquet file as they have
+ // different physical structures.
+ val parquetRequestedSchema =
+ maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
+ val toParquet = new CatalystSchemaConverter(conf)
+ val fileSchema = context.getFileSchema.asGroupType()
+ val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
+
+ StructType
+ // Deserializes the Catalyst schema of requested columns
+ .fromString(schemaString)
+ .map { field =>
+ if (fileFieldNames.contains(field.name)) {
+ // If the field exists in the target Parquet file, extracts the field type from the
+ // full file schema and makes a single-field Parquet schema
+ new MessageType("root", fileSchema.getType(field.name))
+ } else {
+ // Otherwise, just resorts to `CatalystSchemaConverter`
+ toParquet.convert(StructType(Array(field)))
+ }
+ }
+ // Merges all single-field Parquet schemas to form a complete schema for all requested
+ // columns. Note that it's possible that no columns are requested at all (e.g., count
+ // some partition column of a partitioned Parquet table). That's why `fold` is used here
+ // and always fallback to an empty Parquet schema.
+ .fold(new MessageType("root")) {
+ _ union _
+ }
+ }
+
+ val metadata =
+ Map.empty[String, String] ++
+ maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
+ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
+
+ logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
+ new ReadContext(parquetRequestedSchema, metadata)
+ }
+}
+
+private[parquet] object CatalystReadSupport {
+ val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+
+ val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
new file mode 100644
index 0000000000..84f1dccfeb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
+import org.apache.parquet.schema.MessageType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[RecordMaterializer]] for Catalyst rows.
+ *
+ * @param parquetSchema Parquet schema of the records to be read
+ * @param catalystSchema Catalyst schema of the rows to be constructed
+ */
+private[parquet] class CatalystRecordMaterializer(
+ parquetSchema: MessageType, catalystSchema: StructType)
+ extends RecordMaterializer[InternalRow] {
+
+ private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
+
+ override def getCurrentRecord: InternalRow = rootConverter.currentRow
+
+ override def getRootConverter: GroupConverter = rootConverter
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
index 1d3a0d15d3..e9ef01e2db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
@@ -570,6 +570,11 @@ private[parquet] object CatalystSchemaConverter {
""".stripMargin.split("\n").mkString(" "))
}
+ def checkFieldNames(schema: StructType): StructType = {
+ schema.fieldNames.foreach(checkFieldName)
+ schema
+ }
+
def analysisRequire(f: => Boolean, message: String): Unit = {
if (!f) {
throw new AnalysisException(message)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index be0a2029d2..ea51650fe9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
import org.apache.spark.sql.catalyst.InternalRow
+// TODO Removes this while fixing SPARK-8848
private[sql] object CatalystConverter {
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
// Note that "array" for the array elements is chosen by ParquetAvro.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 086559e9f7..cc6fa2b886 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -17,81 +17,720 @@
package org.apache.spark.sql.parquet
-import java.io.IOException
+import java.net.URI
import java.util.logging.{Level, Logger => JLogger}
+import java.util.{List => JList}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.{Failure, Try}
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _}
import org.apache.parquet.schema.MessageType
import org.apache.parquet.{Log => ParquetLog}
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD}
+import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+
+private[sql] class DefaultSource extends HadoopFsRelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ paths: Array[String],
+ schema: Option[StructType],
+ partitionColumns: Option[StructType],
+ parameters: Map[String, String]): HadoopFsRelation = {
+ new ParquetRelation(paths, schema, None, partitionColumns, parameters)(sqlContext)
+ }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
+ extends OutputWriterInternal {
+
+ private val recordWriter: RecordWriter[Void, InternalRow] = {
+ val outputFormat = {
+ new ParquetOutputFormat[InternalRow]() {
+ // Here we override `getDefaultWorkFile` for two reasons:
+ //
+ // 1. To allow appending. We need to generate unique output file names to avoid
+ // overwriting existing files (either exist before the write job, or are just written
+ // by other tasks within the same write job).
+ //
+ // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
+ // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
+ // partitions in the case of dynamic partitioning.
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+ val split = context.getTaskAttemptID.getTaskID.getId
+ new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+ }
+ }
+ }
+
+ outputFormat.getRecordWriter(context)
+ }
+
+ override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
+
+ override def close(): Unit = recordWriter.close(context)
+}
+
+private[sql] class ParquetRelation(
+ override val paths: Array[String],
+ private val maybeDataSchema: Option[StructType],
+ // This is for metastore conversion.
+ private val maybePartitionSpec: Option[PartitionSpec],
+ override val userDefinedPartitionColumns: Option[StructType],
+ parameters: Map[String, String])(
+ val sqlContext: SQLContext)
+ extends HadoopFsRelation(maybePartitionSpec)
+ with Logging {
+
+ private[sql] def this(
+ paths: Array[String],
+ maybeDataSchema: Option[StructType],
+ maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])(
+ sqlContext: SQLContext) = {
+ this(
+ paths,
+ maybeDataSchema,
+ maybePartitionSpec,
+ maybePartitionSpec.map(_.partitionColumns),
+ parameters)(sqlContext)
+ }
+
+ // Should we merge schemas from all Parquet part-files?
+ private val shouldMergeSchemas =
+ parameters
+ .get(ParquetRelation.MERGE_SCHEMA)
+ .map(_.toBoolean)
+ .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+
+ private val maybeMetastoreSchema = parameters
+ .get(ParquetRelation.METASTORE_SCHEMA)
+ .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+ private lazy val metadataCache: MetadataCache = {
+ val meta = new MetadataCache
+ meta.refresh()
+ meta
+ }
-/**
- * Relation that consists of data stored in a Parquet columnar format.
- *
- * Users should interact with parquet files though a [[DataFrame]], created by a [[SQLContext]]
- * instead of using this class directly.
- *
- * {{{
- * val parquetRDD = sqlContext.parquetFile("path/to/parquet.file")
- * }}}
- *
- * @param path The path to the Parquet file.
- */
-private[sql] case class ParquetRelation(
- path: String,
- @transient conf: Option[Configuration],
- @transient sqlContext: SQLContext,
- partitioningAttributes: Seq[Attribute] = Nil)
- extends LeafNode with MultiInstanceRelation {
-
- /** Schema derived from ParquetFile */
- def parquetSchema: MessageType =
- ParquetTypesConverter
- .readMetaData(new Path(path), conf)
- .getFileMetaData
- .getSchema
-
- /** Attributes */
- override val output =
- partitioningAttributes ++
- ParquetTypesConverter.readSchemaFromFile(
- new Path(path.split(",").head),
- conf,
- sqlContext.conf.isParquetBinaryAsString,
- sqlContext.conf.isParquetINT96AsTimestamp)
- lazy val attributeMap = AttributeMap(output.map(o => o -> o))
-
- override def newInstance(): this.type = {
- ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
- }
-
- // Equals must also take into account the output attributes so that we can distinguish between
- // different instances of the same relation,
override def equals(other: Any): Boolean = other match {
- case p: ParquetRelation =>
- p.path == path && p.output == output
+ case that: ParquetRelation =>
+ val schemaEquality = if (shouldMergeSchemas) {
+ this.shouldMergeSchemas == that.shouldMergeSchemas
+ } else {
+ this.dataSchema == that.dataSchema &&
+ this.schema == that.schema
+ }
+
+ this.paths.toSet == that.paths.toSet &&
+ schemaEquality &&
+ this.maybeDataSchema == that.maybeDataSchema &&
+ this.partitionColumns == that.partitionColumns
+
case _ => false
}
- override def hashCode: Int = {
- com.google.common.base.Objects.hashCode(path, output)
+ override def hashCode(): Int = {
+ if (shouldMergeSchemas) {
+ Objects.hashCode(
+ Boolean.box(shouldMergeSchemas),
+ paths.toSet,
+ maybeDataSchema,
+ partitionColumns)
+ } else {
+ Objects.hashCode(
+ Boolean.box(shouldMergeSchemas),
+ paths.toSet,
+ dataSchema,
+ schema,
+ maybeDataSchema,
+ partitionColumns)
+ }
+ }
+
+ /** Constraints on schema of dataframe to be stored. */
+ private def checkConstraints(schema: StructType): Unit = {
+ if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+ val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }.mkString(", ")
+ throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+ s"cannot save to parquet format")
+ }
+ }
+
+ override def dataSchema: StructType = {
+ val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+ // check if schema satisfies the constraints
+ // before moving forward
+ checkConstraints(schema)
+ schema
+ }
+
+ override private[sql] def refresh(): Unit = {
+ super.refresh()
+ metadataCache.refresh()
+ }
+
+ // Parquet data source always uses Catalyst internal representations.
+ override val needConversion: Boolean = false
+
+ override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
+
+ override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+ val conf = ContextUtil.getConfiguration(job)
+
+ val committerClass =
+ conf.getClass(
+ SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
+ classOf[ParquetOutputCommitter],
+ classOf[ParquetOutputCommitter])
+
+ if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
+ logInfo("Using default output committer for Parquet: " +
+ classOf[ParquetOutputCommitter].getCanonicalName)
+ } else {
+ logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
+ }
+
+ conf.setClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ committerClass,
+ classOf[ParquetOutputCommitter])
+
+ // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
+ // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
+ // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
+ // bundled with `ParquetOutputFormat[Row]`.
+ job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
+ // TODO There's no need to use two kinds of WriteSupport
+ // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
+ // complex types.
+ val writeSupportClass =
+ if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
+ classOf[MutableRowWriteSupport]
+ } else {
+ classOf[RowWriteSupport]
+ }
+
+ ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
+ RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+
+ // Sets compression scheme
+ conf.set(
+ ParquetOutputFormat.COMPRESSION,
+ ParquetRelation
+ .shortParquetCompressionCodecNames
+ .getOrElse(
+ sqlContext.conf.parquetCompressionCodec.toUpperCase,
+ CompressionCodecName.UNCOMPRESSED).name())
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
+ new ParquetOutputWriter(path, context)
+ }
+ }
+ }
+
+ override def buildScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputFiles: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
+ val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
+ val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
+ val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
+ val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
+ val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+
+ // Create the function to set variable Parquet confs at both driver and executor side.
+ val initLocalJobFuncOpt =
+ ParquetRelation.initializeLocalJobFunc(
+ requiredColumns,
+ filters,
+ dataSchema,
+ useMetadataCache,
+ parquetFilterPushDown,
+ assumeBinaryIsString,
+ assumeInt96IsTimestamp,
+ followParquetFormatSpec) _
+
+ // Create the function to set input paths at the driver side.
+ val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
+
+ Utils.withDummyCallSite(sqlContext.sparkContext) {
+ new SqlNewHadoopRDD(
+ sc = sqlContext.sparkContext,
+ broadcastedConf = broadcastedConf,
+ initDriverSideJobFuncOpt = Some(setInputPaths),
+ initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
+ inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
+ keyClass = classOf[Void],
+ valueClass = classOf[InternalRow]) {
+
+ val cacheMetadata = useMetadataCache
+
+ @transient val cachedStatuses = inputFiles.map { f =>
+ // In order to encode the authority of a Path containing special characters such as '/'
+ // (which does happen in some S3N credentials), we need to use the string returned by the
+ // URI of the path to create a new Path.
+ val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
+ new FileStatus(
+ f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+ f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
+ }.toSeq
+
+ private def escapePathUserInfo(path: Path): Path = {
+ val uri = path.toUri
+ new Path(new URI(
+ uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath,
+ uri.getQuery, uri.getFragment))
+ }
+
+ // Overridden so we can inject our own cached files statuses.
+ override def getPartitions: Array[SparkPartition] = {
+ val inputFormat = new ParquetInputFormat[InternalRow] {
+ override def listStatus(jobContext: JobContext): JList[FileStatus] = {
+ if (cacheMetadata) cachedStatuses else super.listStatus(jobContext)
+ }
+ }
+
+ val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
+ val rawSplits = inputFormat.getSplits(jobContext)
+
+ Array.tabulate[SparkPartition](rawSplits.size) { i =>
+ new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ }
+ }.values.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row]
+ }
}
- // TODO: Use data from the footers.
- override lazy val statistics = Statistics(sizeInBytes = sqlContext.conf.defaultSizeInBytes)
+ private class MetadataCache {
+ // `FileStatus` objects of all "_metadata" files.
+ private var metadataStatuses: Array[FileStatus] = _
+
+ // `FileStatus` objects of all "_common_metadata" files.
+ private var commonMetadataStatuses: Array[FileStatus] = _
+
+ // `FileStatus` objects of all data files (Parquet part-files).
+ var dataStatuses: Array[FileStatus] = _
+
+ // Schema of the actual Parquet files, without partition columns discovered from partition
+ // directory paths.
+ var dataSchema: StructType = null
+
+ // Schema of the whole table, including partition columns.
+ var schema: StructType = _
+
+ // Cached leaves
+ var cachedLeaves: Set[FileStatus] = null
+
+ /**
+ * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+ */
+ def refresh(): Unit = {
+ val currentLeafStatuses = cachedLeafStatuses()
+
+ // Check if cachedLeafStatuses is changed or not
+ val leafStatusesChanged = (cachedLeaves == null) ||
+ !cachedLeaves.equals(currentLeafStatuses)
+
+ if (leafStatusesChanged) {
+ cachedLeaves = currentLeafStatuses.toIterator.toSet
+
+ // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
+ val leaves = currentLeafStatuses.filter { f =>
+ isSummaryFile(f.getPath) ||
+ !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+ }.toArray
+
+ dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+ metadataStatuses =
+ leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
+ commonMetadataStatuses =
+ leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+ dataSchema = {
+ val dataSchema0 = maybeDataSchema
+ .orElse(readSchema())
+ .orElse(maybeMetastoreSchema)
+ .getOrElse(throw new AnalysisException(
+ s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
+ paths.mkString("\n\t")))
+
+ // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
+ // case insensitivity issue and possible schema mismatch (probably caused by schema
+ // evolution).
+ maybeMetastoreSchema
+ .map(ParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
+ .getOrElse(dataSchema0)
+ }
+ }
+ }
+
+ private def isSummaryFile(file: Path): Boolean = {
+ file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+ file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+ }
+
+ private def readSchema(): Option[StructType] = {
+ // Sees which file(s) we need to touch in order to figure out the schema.
+ //
+ // Always tries the summary files first if users don't require a merged schema. In this case,
+ // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+ // groups information, and could be much smaller for large Parquet files with lots of row
+ // groups. If no summary file is available, falls back to some random part-file.
+ //
+ // NOTE: Metadata stored in the summary files are merged from all part-files. However, for
+ // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+ // how to merge them correctly if some key is associated with different values in different
+ // part-files. When this happens, Parquet simply gives up generating the summary file. This
+ // implies that if a summary file presents, then:
+ //
+ // 1. Either all part-files have exactly the same Spark SQL schema, or
+ // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+ // their schemas may differ from each other).
+ //
+ // Here we tend to be pessimistic and take the second case into account. Basically this means
+ // we can't trust the summary files if users require a merged schema, and must touch all part-
+ // files to do the merge.
+ val filesToTouch =
+ if (shouldMergeSchemas) {
+ // Also includes summary files, 'cause there might be empty partition directories.
+ (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+ } else {
+ // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+ // don't have this.
+ commonMetadataStatuses.headOption
+ // Falls back to "_metadata"
+ .orElse(metadataStatuses.headOption)
+ // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+ // files contain conflicting user defined metadata (two or more values are associated
+ // with a same key in different files). In either case, we fall back to any of the
+ // first part-file, and just assume all schemas are consistent.
+ .orElse(dataStatuses.headOption)
+ .toSeq
+ }
+
+ assert(
+ filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
+ "No predefined schema found, " +
+ s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
+
+ ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext)
+ }
+ }
}
-private[sql] object ParquetRelation {
+private[sql] object ParquetRelation extends Logging {
+ // Whether we should merge schemas collected from all Parquet part-files.
+ private[sql] val MERGE_SCHEMA = "mergeSchema"
+
+ // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
+ // internally.
+ private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+
+ /** This closure sets various Parquet configurations at both driver side and executor side. */
+ private[parquet] def initializeLocalJobFunc(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ dataSchema: StructType,
+ useMetadataCache: Boolean,
+ parquetFilterPushDown: Boolean,
+ assumeBinaryIsString: Boolean,
+ assumeInt96IsTimestamp: Boolean,
+ followParquetFormatSpec: Boolean)(job: Job): Unit = {
+ val conf = job.getConfiguration
+ conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+
+ // Try to push down filters when filter push-down is enabled.
+ if (parquetFilterPushDown) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .reduceOption(FilterApi.and)
+ .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+ }
+
+ conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+ val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+ CatalystSchemaConverter.checkFieldNames(requestedSchema).json
+ })
+
+ conf.set(
+ RowWriteSupport.SPARK_ROW_SCHEMA,
+ CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+
+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
+
+ // Sets flags for Parquet schema conversion
+ conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
+ conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
+ conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
+ }
+
+ /** This closure sets input paths at the driver side. */
+ private[parquet] def initializeDriverSideJobFunc(
+ inputFiles: Array[FileStatus])(job: Job): Unit = {
+ // We side the input paths at the driver side.
+ logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
+ if (inputFiles.nonEmpty) {
+ FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
+ }
+ }
+
+ private[parquet] def readSchema(
+ footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
+
+ def parseParquetSchema(schema: MessageType): StructType = {
+ val converter = new CatalystSchemaConverter(
+ sqlContext.conf.isParquetBinaryAsString,
+ sqlContext.conf.isParquetBinaryAsString,
+ sqlContext.conf.followParquetFormatSpec)
+
+ converter.convert(schema)
+ }
+
+ val seen = mutable.HashSet[String]()
+ val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
+ val metadata = footer.getParquetMetadata.getFileMetaData
+ val serializedSchema = metadata
+ .getKeyValueMetaData
+ .toMap
+ .get(CatalystReadSupport.SPARK_METADATA_KEY)
+ if (serializedSchema.isEmpty) {
+ // Falls back to Parquet schema if no Spark SQL schema found.
+ Some(parseParquetSchema(metadata.getSchema))
+ } else if (!seen.contains(serializedSchema.get)) {
+ seen += serializedSchema.get
+
+ // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+ // whatever is available.
+ Some(Try(DataType.fromJson(serializedSchema.get))
+ .recover { case _: Throwable =>
+ logInfo(
+ s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "falling back to the deprecated DataType.fromCaseClassString parser.")
+ DataType.fromCaseClassString(serializedSchema.get)
+ }
+ .recover { case cause: Throwable =>
+ logWarning(
+ s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+ |\t$serializedSchema
+ """.stripMargin,
+ cause)
+ }
+ .map(_.asInstanceOf[StructType])
+ .getOrElse {
+ // Falls back to Parquet schema if Spark SQL schema can't be parsed.
+ parseParquetSchema(metadata.getSchema)
+ })
+ } else {
+ None
+ }
+ }
+
+ finalSchemas.reduceOption { (left, right) =>
+ try left.merge(right) catch { case e: Throwable =>
+ throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
+ }
+ }
+ }
+
+ /**
+ * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
+ * schema and Parquet schema.
+ *
+ * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
+ * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
+ * distinguish binary and string). This method generates a correct schema by merging Metastore
+ * schema data types and Parquet schema field names.
+ */
+ private[parquet] def mergeMetastoreParquetSchema(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ def schemaConflictMessage: String =
+ s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
+ |${metastoreSchema.prettyJson}
+ |
+ |Parquet schema:
+ |${parquetSchema.prettyJson}
+ """.stripMargin
+
+ val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
+
+ assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
+
+ val ordinalMap = metastoreSchema.zipWithIndex.map {
+ case (field, index) => field.name.toLowerCase -> index
+ }.toMap
+
+ val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
+ ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
+
+ StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+ // Uses Parquet field names but retains Metastore data types.
+ case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
+ mSchema.copy(name = pSchema.name)
+ case _ =>
+ throw new SparkException(schemaConflictMessage)
+ })
+ }
+
+ /**
+ * Returns the original schema from the Parquet file with any missing nullable fields from the
+ * Hive Metastore schema merged in.
+ *
+ * When constructing a DataFrame from a collection of structured data, the resulting object has
+ * a schema corresponding to the union of the fields present in each element of the collection.
+ * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
+ * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
+ * contain a particular nullable field in its schema despite that field being present in the
+ * table schema obtained from the Hive Metastore. This method returns a schema representing the
+ * Parquet file schema along with any additional nullable fields from the Metastore schema
+ * merged in.
+ */
+ private[parquet] def mergeMissingNullableFields(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+ val missingFields = metastoreSchema
+ .map(_.name.toLowerCase)
+ .diff(parquetSchema.map(_.name.toLowerCase))
+ .map(fieldMap(_))
+ .filter(_.nullable)
+ StructType(parquetSchema ++ missingFields)
+ }
+
+ /**
+ * Figures out a merged Parquet schema with a distributed Spark job.
+ *
+ * Note that locality is not taken into consideration here because:
+ *
+ * 1. For a single Parquet part-file, in most cases the footer only resides in the last block of
+ * that file. Thus we only need to retrieve the location of the last block. However, Hadoop
+ * `FileSystem` only provides API to retrieve locations of all blocks, which can be
+ * potentially expensive.
+ *
+ * 2. This optimization is mainly useful for S3, where file metadata operations can be pretty
+ * slow. And basically locality is not available when using S3 (you can't run computation on
+ * S3 nodes).
+ */
+ def mergeSchemasInParallel(
+ filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
+ val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
+ val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
+ val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+ val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+
+ // HACK ALERT:
+ //
+ // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
+ // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
+ // but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well
+ // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These
+ // facts virtually prevents us to serialize `FileStatus`es.
+ //
+ // Since Parquet only relies on path and length information of those `FileStatus`es to read
+ // footers, here we just extract them (which can be easily serialized), send them to executor
+ // side, and resemble fake `FileStatus`es there.
+ val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
+
+ // Issues a Spark job to read Parquet schema in parallel.
+ val partiallyMergedSchemas =
+ sqlContext
+ .sparkContext
+ .parallelize(partialFileStatusInfo)
+ .mapPartitions { iterator =>
+ // Resembles fake `FileStatus`es with serialized path and length information.
+ val fakeFileStatuses = iterator.map { case (path, length) =>
+ new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
+ }.toSeq
+
+ // Skips row group information since we only need the schema
+ val skipRowGroups = true
+
+ // Reads footers in multi-threaded manner within each task
+ val footers =
+ ParquetFileReader.readAllFootersInParallel(
+ serializedConf.value, fakeFileStatuses, skipRowGroups)
+
+ // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
+ val converter =
+ new CatalystSchemaConverter(
+ assumeBinaryIsString = assumeBinaryIsString,
+ assumeInt96IsTimestamp = assumeInt96IsTimestamp,
+ followParquetFormatSpec = followParquetFormatSpec)
+
+ footers.map { footer =>
+ ParquetRelation.readSchemaFromFooter(footer, converter)
+ }.reduceOption(_ merge _).iterator
+ }.collect()
+
+ partiallyMergedSchemas.reduceOption(_ merge _)
+ }
+
+ /**
+ * Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string
+ * can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns
+ * a [[StructType]] converted from the [[MessageType]] stored in this footer.
+ */
+ def readSchemaFromFooter(
+ footer: Footer, converter: CatalystSchemaConverter): StructType = {
+ val fileMetaData = footer.getParquetMetadata.getFileMetaData
+ fileMetaData
+ .getKeyValueMetaData
+ .toMap
+ .get(CatalystReadSupport.SPARK_METADATA_KEY)
+ .flatMap(deserializeSchemaString)
+ .getOrElse(converter.convert(fileMetaData.getSchema))
+ }
+
+ private def deserializeSchemaString(schemaString: String): Option[StructType] = {
+ // Tries to deserialize the schema string as JSON first, then falls back to the case class
+ // string parser (data generated by older versions of Spark SQL uses this format).
+ Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
+ case _: Throwable =>
+ logInfo(
+ s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "falling back to the deprecated DataType.fromCaseClassString parser.")
+ DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
+ }.recoverWith {
+ case cause: Throwable =>
+ logWarning(
+ "Failed to parse and ignored serialized Spark schema in " +
+ s"Parquet key-value metadata:\n\t$schemaString", cause)
+ Failure(cause)
+ }.toOption
+ }
def enableLogForwarding() {
// Note: the org.apache.parquet.Log class has a static initializer that
@@ -127,12 +766,6 @@ private[sql] object ParquetRelation {
JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF)
}
- // The element type for the RDDs that this relation maps to.
- type RowType = org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-
- // The compression type
- type CompressionType = org.apache.parquet.hadoop.metadata.CompressionCodecName
-
// The parquet compression short names
val shortParquetCompressionCodecNames = Map(
"NONE" -> CompressionCodecName.UNCOMPRESSED,
@@ -140,82 +773,4 @@ private[sql] object ParquetRelation {
"SNAPPY" -> CompressionCodecName.SNAPPY,
"GZIP" -> CompressionCodecName.GZIP,
"LZO" -> CompressionCodecName.LZO)
-
- /**
- * Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
- * this is used inside [[org.apache.spark.sql.execution.SparkStrategies SparkStrategies]] to
- * create a resolved relation as a data sink for writing to a Parquetfile. The relation is empty
- * but is initialized with ParquetMetadata and can be inserted into.
- *
- * @param pathString The directory the Parquetfile will be stored in.
- * @param child The child node that will be used for extracting the schema.
- * @param conf A configuration to be used.
- * @return An empty ParquetRelation with inferred metadata.
- */
- def create(pathString: String,
- child: LogicalPlan,
- conf: Configuration,
- sqlContext: SQLContext): ParquetRelation = {
- if (!child.resolved) {
- throw new UnresolvedException[LogicalPlan](
- child,
- "Attempt to create Parquet table from unresolved child (when schema is not available)")
- }
- createEmpty(pathString, child.output, false, conf, sqlContext)
- }
-
- /**
- * Creates an empty ParquetRelation and underlying Parquetfile that only
- * consists of the Metadata for the given schema.
- *
- * @param pathString The directory the Parquetfile will be stored in.
- * @param attributes The schema of the relation.
- * @param conf A configuration to be used.
- * @return An empty ParquetRelation.
- */
- def createEmpty(pathString: String,
- attributes: Seq[Attribute],
- allowExisting: Boolean,
- conf: Configuration,
- sqlContext: SQLContext): ParquetRelation = {
- val path = checkPath(pathString, allowExisting, conf)
- conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
- sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED)
- .name())
- ParquetRelation.enableLogForwarding()
- // This is a hack. We always set nullable/containsNull/valueContainsNull to true
- // for the schema of a parquet data.
- val schema = StructType.fromAttributes(attributes).asNullable
- val newAttributes = schema.toAttributes
- ParquetTypesConverter.writeMetaData(newAttributes, path, conf)
- new ParquetRelation(path.toString, Some(conf), sqlContext) {
- override val output = newAttributes
- }
- }
-
- private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
- if (pathStr == null) {
- throw new IllegalArgumentException("Unable to create ParquetRelation: path is null")
- }
- val origPath = new Path(pathStr)
- val fs = origPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException(
- s"Unable to create ParquetRelation: incorrectly formatted path $pathStr")
- }
- val path = origPath.makeQualified(fs)
- if (!allowExisting && fs.exists(path)) {
- sys.error(s"File $pathStr already exists.")
- }
-
- if (fs.exists(path) &&
- !fs.getFileStatus(path)
- .getPermission
- .getUserAction
- .implies(FsAction.READ_WRITE)) {
- throw new IOException(
- s"Unable to create ParquetRelation: path $path not read-writable")
- }
- path
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
deleted file mode 100644
index 75cbbde4f1..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.io.IOException
-import java.text.{NumberFormat, SimpleDateFormat}
-import java.util.concurrent.TimeUnit
-import java.util.Date
-
-import scala.collection.JavaConversions._
-import scala.util.Try
-
-import com.google.common.cache.CacheBuilder
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat => NewFileOutputFormat}
-import org.apache.parquet.hadoop._
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, _}
-import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.util.SerializableConfiguration
-
-/**
- * :: DeveloperApi ::
- * Parquet table scan operator. Imports the file that backs the given
- * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[InternalRow]``.
- */
-private[sql] case class ParquetTableScan(
- attributes: Seq[Attribute],
- relation: ParquetRelation,
- columnPruningPred: Seq[Expression])
- extends LeafNode {
-
- // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
- // by exprId. note: output cannot be transient, see
- // https://issues.apache.org/jira/browse/SPARK-1367
- val output = attributes.map(relation.attributeMap)
-
- // A mapping of ordinals partitionRow -> finalOutput.
- val requestedPartitionOrdinals = {
- val partitionAttributeOrdinals = AttributeMap(relation.partitioningAttributes.zipWithIndex)
-
- attributes.zipWithIndex.flatMap {
- case (attribute, finalOrdinal) =>
- partitionAttributeOrdinals.get(attribute).map(_ -> finalOrdinal)
- }
- }.toArray
-
- protected override def doExecute(): RDD[InternalRow] = {
- import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat
-
- val sc = sqlContext.sparkContext
- val job = new Job(sc.hadoopConfiguration)
- ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
-
- val conf: Configuration = ContextUtil.getConfiguration(job)
-
- relation.path.split(",").foreach { curPath =>
- val qualifiedPath = {
- val path = new Path(curPath)
- path.getFileSystem(conf).makeQualified(path)
- }
- NewFileInputFormat.addInputPath(job, qualifiedPath)
- }
-
- // Store both requested and original schema in `Configuration`
- conf.set(
- RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- ParquetTypesConverter.convertToString(output))
- conf.set(
- RowWriteSupport.SPARK_ROW_SCHEMA,
- ParquetTypesConverter.convertToString(relation.output))
-
- // Store record filtering predicate in `Configuration`
- // Note 1: the input format ignores all predicates that cannot be expressed
- // as simple column predicate filters in Parquet. Here we just record
- // the whole pruning predicate.
- ParquetFilters
- .createRecordFilter(columnPruningPred)
- .map(_.asInstanceOf[FilterPredicateCompat].getFilterPredicate)
- // Set this in configuration of ParquetInputFormat, needed for RowGroupFiltering
- .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
-
- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- conf.setBoolean(
- SQLConf.PARQUET_CACHE_METADATA.key,
- sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, true))
-
- // Use task side metadata in parquet
- conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
-
- val baseRDD =
- new org.apache.spark.rdd.NewHadoopRDD(
- sc,
- classOf[FilteringParquetRowInputFormat],
- classOf[Void],
- classOf[InternalRow],
- conf)
-
- if (requestedPartitionOrdinals.nonEmpty) {
- // This check is based on CatalystConverter.createRootConverter.
- val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
-
- // Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into
- // the `mapPartitionsWithInputSplit` closure below.
- val outputSize = output.size
-
- baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
- val partValue = "([^=]+)=([^=]+)".r
- val partValues =
- split.asInstanceOf[org.apache.parquet.hadoop.ParquetInputSplit]
- .getPath
- .toString
- .split("/")
- .flatMap {
- case partValue(key, value) => Some(key -> value)
- case _ => None
- }.toMap
-
- // Convert the partitioning attributes into the correct types
- val partitionRowValues =
- relation.partitioningAttributes
- .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
-
- if (primitiveRow) {
- new Iterator[InternalRow] {
- def hasNext: Boolean = iter.hasNext
- def next(): InternalRow = {
- // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
- val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
-
- // Parquet will leave partitioning columns empty, so we fill them in here.
- var i = 0
- while (i < requestedPartitionOrdinals.length) {
- row(requestedPartitionOrdinals(i)._2) =
- partitionRowValues(requestedPartitionOrdinals(i)._1)
- i += 1
- }
- row
- }
- }
- } else {
- // Create a mutable row since we need to fill in values from partition columns.
- val mutableRow = new GenericMutableRow(outputSize)
- new Iterator[InternalRow] {
- def hasNext: Boolean = iter.hasNext
- def next(): InternalRow = {
- // We are using CatalystGroupConverter and it returns a GenericRow.
- // Since GenericRow is not mutable, we just cast it to a Row.
- val row = iter.next()._2.asInstanceOf[InternalRow]
-
- var i = 0
- while (i < row.numFields) {
- mutableRow(i) = row.genericGet(i)
- i += 1
- }
- // Parquet will leave partitioning columns empty, so we fill them in here.
- i = 0
- while (i < requestedPartitionOrdinals.length) {
- mutableRow(requestedPartitionOrdinals(i)._2) =
- partitionRowValues(requestedPartitionOrdinals(i)._1)
- i += 1
- }
- mutableRow
- }
- }
- }
- }
- } else {
- baseRDD.map(_._2)
- }
- }
-
- /**
- * Applies a (candidate) projection.
- *
- * @param prunedAttributes The list of attributes to be used in the projection.
- * @return Pruned TableScan.
- */
- def pruneColumns(prunedAttributes: Seq[Attribute]): ParquetTableScan = {
- val success = validateProjection(prunedAttributes)
- if (success) {
- ParquetTableScan(prunedAttributes, relation, columnPruningPred)
- } else {
- sys.error("Warning: Could not validate Parquet schema projection in pruneColumns")
- }
- }
-
- /**
- * Evaluates a candidate projection by checking whether the candidate is a subtype
- * of the original type.
- *
- * @param projection The candidate projection.
- * @return True if the projection is valid, false otherwise.
- */
- private def validateProjection(projection: Seq[Attribute]): Boolean = {
- val original: MessageType = relation.parquetSchema
- val candidate: MessageType = ParquetTypesConverter.convertFromAttributes(projection)
- Try(original.checkContains(candidate)).isSuccess
- }
-}
-
-/**
- * :: DeveloperApi ::
- * Operator that acts as a sink for queries on RDDs and can be used to
- * store the output inside a directory of Parquet files. This operator
- * is similar to Hive's INSERT INTO TABLE operation in the sense that
- * one can choose to either overwrite or append to a directory. Note
- * that consecutive insertions to the same table must have compatible
- * (source) schemas.
- *
- * WARNING: EXPERIMENTAL! InsertIntoParquetTable with overwrite=false may
- * cause data corruption in the case that multiple users try to append to
- * the same table simultaneously. Inserting into a table that was
- * previously generated by other means (e.g., by creating an HDFS
- * directory and importing Parquet files generated by other tools) may
- * cause unpredicted behaviour and therefore results in a RuntimeException
- * (only detected via filename pattern so will not catch all cases).
- */
-@DeveloperApi
-private[sql] case class InsertIntoParquetTable(
- relation: ParquetRelation,
- child: SparkPlan,
- overwrite: Boolean = false)
- extends UnaryNode with SparkHadoopMapReduceUtil {
-
- /**
- * Inserts all rows into the Parquet file.
- */
- protected override def doExecute(): RDD[InternalRow] = {
- // TODO: currently we do not check whether the "schema"s are compatible
- // That means if one first creates a table and then INSERTs data with
- // and incompatible schema the execution will fail. It would be nice
- // to catch this early one, maybe having the planner validate the schema
- // before calling execute().
-
- val childRdd = child.execute()
- assert(childRdd != null)
-
- val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
-
- val writeSupport =
- if (child.output.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
- log.debug("Initializing MutableRowWriteSupport")
- classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
- } else {
- classOf[org.apache.spark.sql.parquet.RowWriteSupport]
- }
-
- ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
-
- val conf = ContextUtil.getConfiguration(job)
- // This is a hack. We always set nullable/containsNull/valueContainsNull to true
- // for the schema of a parquet data.
- val schema = StructType.fromAttributes(relation.output).asNullable
- RowWriteSupport.setSchema(schema.toAttributes, conf)
-
- val fspath = new Path(relation.path)
- val fs = fspath.getFileSystem(conf)
-
- if (overwrite) {
- try {
- fs.delete(fspath, true)
- } catch {
- case e: IOException =>
- throw new IOException(
- s"Unable to clear output directory ${fspath.toString} prior"
- + s" to InsertIntoParquetTable:\n${e.toString}")
- }
- }
- saveAsHadoopFile(childRdd, relation.path.toString, conf)
-
- // We return the child RDD to allow chaining (alternatively, one could return nothing).
- childRdd
- }
-
- override def output: Seq[Attribute] = child.output
-
- /**
- * Stores the given Row RDD as a Hadoop file.
- *
- * Note: We cannot use ``saveAsNewAPIHadoopFile`` from [[org.apache.spark.rdd.PairRDDFunctions]]
- * together with [[org.apache.spark.util.MutablePair]] because ``PairRDDFunctions`` uses
- * ``Tuple2`` and not ``Product2``. Also, we want to allow appending files to an existing
- * directory and need to determine which was the largest written file index before starting to
- * write.
- *
- * @param rdd The [[org.apache.spark.rdd.RDD]] to writer
- * @param path The directory to write to.
- * @param conf A [[org.apache.hadoop.conf.Configuration]].
- */
- private def saveAsHadoopFile(
- rdd: RDD[InternalRow],
- path: String,
- conf: Configuration) {
- val job = new Job(conf)
- val keyType = classOf[Void]
- job.setOutputKeyClass(keyType)
- job.setOutputValueClass(classOf[InternalRow])
- NewFileOutputFormat.setOutputPath(job, new Path(path))
- val wrappedConf = new SerializableConfiguration(job.getConfiguration)
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- val jobtrackerID = formatter.format(new Date())
- val stageId = sqlContext.sparkContext.newRddId()
-
- val taskIdOffset =
- if (overwrite) {
- 1
- } else {
- FileSystemHelper
- .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
- }
-
- def writeShard(context: TaskContext, iter: Iterator[InternalRow]): Int = {
- /* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
- context.attemptNumber)
- val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
- val format = new AppendingParquetOutputFormat(taskIdOffset)
- val committer = format.getOutputCommitter(hadoopContext)
- committer.setupTask(hadoopContext)
- val writer = format.getRecordWriter(hadoopContext)
- try {
- while (iter.hasNext) {
- val row = iter.next()
- writer.write(null, row)
- }
- } finally {
- writer.close(hadoopContext)
- }
- SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
- 1
- }
- val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
- /* apparently we need a TaskAttemptID to construct an OutputCommitter;
- * however we're only going to use this local OutputCommitter for
- * setupJob/commitJob, so we just use a dummy "map" task.
- */
- val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
- val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
- val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
- jobCommitter.setupJob(jobTaskContext)
- sqlContext.sparkContext.runJob(rdd, writeShard _)
- jobCommitter.commitJob(jobTaskContext)
- }
-}
-
-/**
- * TODO: this will be able to append to directories it created itself, not necessarily
- * to imported ones.
- */
-private[parquet] class AppendingParquetOutputFormat(offset: Int)
- extends org.apache.parquet.hadoop.ParquetOutputFormat[InternalRow] {
- // override to accept existing directories as valid output directory
- override def checkOutputSpecs(job: JobContext): Unit = {}
- var committer: OutputCommitter = null
-
- // override to choose output filename so not overwrite existing ones
- override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val numfmt = NumberFormat.getInstance()
- numfmt.setMinimumIntegerDigits(5)
- numfmt.setGroupingUsed(false)
-
- val taskId: TaskID = getTaskAttemptID(context).getTaskID
- val partition: Int = taskId.getId
- val filename = "part-r-" + numfmt.format(partition + offset) + ".parquet"
- val committer: FileOutputCommitter =
- getOutputCommitter(context).asInstanceOf[FileOutputCommitter]
- new Path(committer.getWorkPath, filename)
- }
-
- // The TaskAttemptContext is a class in hadoop-1 but is an interface in hadoop-2.
- // The signatures of the method TaskAttemptContext.getTaskAttemptID for the both versions
- // are the same, so the method calls are source-compatible but NOT binary-compatible because
- // the opcode of method call for class is INVOKEVIRTUAL and for interface is INVOKEINTERFACE.
- private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
- context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
- }
-
- // override to create output committer from configuration
- override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- if (committer == null) {
- val output = getOutputPath(context)
- val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
- classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
- val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter]
- }
- committer
- }
-
- // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2
- private def getOutputPath(context: TaskAttemptContext): Path = {
- context.getConfiguration().get("mapred.output.dir") match {
- case null => null
- case name => new Path(name)
- }
- }
-}
-
-// TODO Removes this class after removing old Parquet support code
-/**
- * We extend ParquetInputFormat in order to have more control over which
- * RecordFilter we want to use.
- */
-private[parquet] class FilteringParquetRowInputFormat
- extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with Logging {
-
- override def createRecordReader(
- inputSplit: InputSplit,
- taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = {
-
- import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter
-
- val readSupport: ReadSupport[InternalRow] = new RowReadSupport()
-
- val filter = ParquetInputFormat.getFilter(ContextUtil.getConfiguration(taskAttemptContext))
- if (!filter.isInstanceOf[NoOpFilter]) {
- new ParquetRecordReader[InternalRow](
- readSupport,
- filter)
- } else {
- new ParquetRecordReader[InternalRow](readSupport)
- }
- }
-
-}
-
-private[parquet] object FileSystemHelper {
- def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
- val origPath = new Path(pathStr)
- val fs = origPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException(
- s"ParquetTableOperations: Path $origPath is incorrectly formatted")
- }
- val path = origPath.makeQualified(fs)
- if (!fs.exists(path) || !fs.getFileStatus(path).isDir) {
- throw new IllegalArgumentException(
- s"ParquetTableOperations: path $path does not exist or is not a directory")
- }
- fs.globStatus(path)
- .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) else List(status) }
- .map(_.getPath)
- }
-
- /**
- * Finds the maximum taskid in the output file names at the given path.
- */
- def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
- val files = FileSystemHelper.listFiles(pathStr, conf)
- // filename pattern is part-r-<int>.parquet
- val nameP = new scala.util.matching.Regex("""part-.-(\d{1,}).*""", "taskid")
- val hiddenFileP = new scala.util.matching.Regex("_.*")
- files.map(_.getName).map {
- case nameP(taskid) => taskid.toInt
- case hiddenFileP() => 0
- case other: String =>
- sys.error("ERROR: attempting to append to set of Parquet files and found file" +
- s"that does not match name pattern: $other")
- case _ => 0
- }.reduceOption(_ max _).getOrElse(0)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 7b6a7f65d6..fc9f61a636 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -18,18 +18,13 @@
package org.apache.spark.sql.parquet
import java.nio.{ByteBuffer, ByteOrder}
-import java.util
import java.util.{HashMap => JHashMap}
-import scala.collection.JavaConversions._
-
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
-import org.apache.parquet.hadoop.api.{InitContext, ReadSupport, WriteSupport}
+import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.io.api._
-import org.apache.parquet.schema.MessageType
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
@@ -39,147 +34,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
- * A [[RecordMaterializer]] for Catalyst rows.
- *
- * @param parquetSchema Parquet schema of the records to be read
- * @param catalystSchema Catalyst schema of the rows to be constructed
- */
-private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, catalystSchema: StructType)
- extends RecordMaterializer[InternalRow] {
-
- private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
-
- override def getCurrentRecord: InternalRow = rootConverter.currentRow
-
- override def getRootConverter: GroupConverter = rootConverter
-}
-
-private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logging {
- override def prepareForRead(
- conf: Configuration,
- keyValueMetaData: util.Map[String, String],
- fileSchema: MessageType,
- readContext: ReadContext): RecordMaterializer[InternalRow] = {
- log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
-
- val toCatalyst = new CatalystSchemaConverter(conf)
- val parquetRequestedSchema = readContext.getRequestedSchema
-
- val catalystRequestedSchema =
- Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
- metadata
- // First tries to read requested schema, which may result from projections
- .get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
- // If not available, tries to read Catalyst schema from file metadata. It's only
- // available if the target file is written by Spark SQL.
- .orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
- }.map(StructType.fromString).getOrElse {
- logDebug("Catalyst schema not available, falling back to Parquet schema")
- toCatalyst.convert(parquetRequestedSchema)
- }
-
- logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
- new RowRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
- }
-
- override def init(context: InitContext): ReadContext = {
- val conf = context.getConfiguration
-
- // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
- // schema of this file from its the metadata.
- val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
-
- // Optional schema of requested columns, in the form of a string serialized from a Catalyst
- // `StructType` containing all requested columns.
- val maybeRequestedSchema = Option(conf.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
-
- // Below we construct a Parquet schema containing all requested columns. This schema tells
- // Parquet which columns to read.
- //
- // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
- // we have to fallback to the full file schema which contains all columns in the file.
- // Obviously this may waste IO bandwidth since it may read more columns than requested.
- //
- // Two things to note:
- //
- // 1. It's possible that some requested columns don't exist in the target Parquet file. For
- // example, in the case of schema merging, the globally merged schema may contain extra
- // columns gathered from other Parquet files. These columns will be simply filled with nulls
- // when actually reading the target Parquet file.
- //
- // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
- // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
- // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
- // containing a single integer array field `f1` may have the following legacy 2-level
- // structure:
- //
- // message root {
- // optional group f1 (LIST) {
- // required INT32 element;
- // }
- // }
- //
- // while `CatalystSchemaConverter` may generate a standard 3-level structure:
- //
- // message root {
- // optional group f1 (LIST) {
- // repeated group list {
- // required INT32 element;
- // }
- // }
- // }
- //
- // Apparently, we can't use the 2nd schema to read the target Parquet file as they have
- // different physical structures.
- val parquetRequestedSchema =
- maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
- val toParquet = new CatalystSchemaConverter(conf)
- val fileSchema = context.getFileSchema.asGroupType()
- val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
-
- StructType
- // Deserializes the Catalyst schema of requested columns
- .fromString(schemaString)
- .map { field =>
- if (fileFieldNames.contains(field.name)) {
- // If the field exists in the target Parquet file, extracts the field type from the
- // full file schema and makes a single-field Parquet schema
- new MessageType("root", fileSchema.getType(field.name))
- } else {
- // Otherwise, just resorts to `CatalystSchemaConverter`
- toParquet.convert(StructType(Array(field)))
- }
- }
- // Merges all single-field Parquet schemas to form a complete schema for all requested
- // columns. Note that it's possible that no columns are requested at all (e.g., count
- // some partition column of a partitioned Parquet table). That's why `fold` is used here
- // and always fallback to an empty Parquet schema.
- .fold(new MessageType("root")) {
- _ union _
- }
- }
-
- val metadata =
- Map.empty[String, String] ++
- maybeRequestedSchema.map(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
- maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
-
- logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
- new ReadContext(parquetRequestedSchema, metadata)
- }
-}
-
-private[parquet] object RowReadSupport {
- val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
- val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
-
- private def getRequestedSchema(configuration: Configuration): Seq[Attribute] = {
- val schemaString = configuration.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
- if (schemaString == null) null else ParquetTypesConverter.convertFromString(schemaString)
- }
-}
-
-/**
* A `parquet.hadoop.api.WriteSupport` for Row objects.
*/
private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging {
@@ -190,7 +44,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
override def init(configuration: Configuration): WriteSupport.WriteContext = {
val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
val metadata = new JHashMap[String, String]()
- metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
+ metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr)
if (attributes == null) {
attributes = ParquetTypesConverter.convertFromString(origAttributesStr).toArray
@@ -443,4 +297,3 @@ private[parquet] object RowWriteSupport {
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
}
}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index e748bd7857..3854f5bd39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -53,15 +53,6 @@ private[parquet] object ParquetTypesConverter extends Logging {
length
}
- def convertToAttributes(
- parquetSchema: MessageType,
- isBinaryAsString: Boolean,
- isInt96AsTimestamp: Boolean): Seq[Attribute] = {
- val converter = new CatalystSchemaConverter(
- isBinaryAsString, isInt96AsTimestamp, followParquetFormatSpec = false)
- converter.convert(parquetSchema).toAttributes
- }
-
def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
val converter = new CatalystSchemaConverter()
converter.convert(StructType.fromAttributes(attributes))
@@ -103,7 +94,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
val extraMetadata = new java.util.HashMap[String, String]()
extraMetadata.put(
- RowReadSupport.SPARK_METADATA_KEY,
+ CatalystReadSupport.SPARK_METADATA_KEY,
ParquetTypesConverter.convertToString(attributes))
// TODO: add extra data, e.g., table name, date, etc.?
@@ -165,35 +156,4 @@ private[parquet] object ParquetTypesConverter extends Logging {
.getOrElse(
throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
}
-
- /**
- * Reads in Parquet Metadata from the given path and tries to extract the schema
- * (Catalyst attributes) from the application-specific key-value map. If this
- * is empty it falls back to converting from the Parquet file schema which
- * may lead to an upcast of types (e.g., {byte, short} to int).
- *
- * @param origPath The path at which we expect one (or more) Parquet files.
- * @param conf The Hadoop configuration to use.
- * @return A list of attributes that make up the schema.
- */
- def readSchemaFromFile(
- origPath: Path,
- conf: Option[Configuration],
- isBinaryAsString: Boolean,
- isInt96AsTimestamp: Boolean): Seq[Attribute] = {
- val keyValueMetadata: java.util.Map[String, String] =
- readMetaData(origPath, conf)
- .getFileMetaData
- .getKeyValueMetaData
- if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
- convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
- } else {
- val attributes = convertToAttributes(
- readMetaData(origPath, conf).getFileMetaData.getSchema,
- isBinaryAsString,
- isInt96AsTimestamp)
- log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
- attributes
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
deleted file mode 100644
index 8ec228c2b2..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ /dev/null
@@ -1,732 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.net.URI
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Try}
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.hadoop._
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD}
-import org.apache.spark.sql.execution.datasources.PartitionSpec
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-
-private[sql] class DefaultSource extends HadoopFsRelationProvider {
- override def createRelation(
- sqlContext: SQLContext,
- paths: Array[String],
- schema: Option[StructType],
- partitionColumns: Option[StructType],
- parameters: Map[String, String]): HadoopFsRelation = {
- new ParquetRelation2(paths, schema, None, partitionColumns, parameters)(sqlContext)
- }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
-private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
- extends OutputWriterInternal {
-
- private val recordWriter: RecordWriter[Void, InternalRow] = {
- val outputFormat = {
- new ParquetOutputFormat[InternalRow]() {
- // Here we override `getDefaultWorkFile` for two reasons:
- //
- // 1. To allow appending. We need to generate unique output file names to avoid
- // overwriting existing files (either exist before the write job, or are just written
- // by other tasks within the same write job).
- //
- // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
- // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
- // partitions in the case of dynamic partitioning.
- override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
- }
- }
- }
-
- outputFormat.getRecordWriter(context)
- }
-
- override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
-
- override def close(): Unit = recordWriter.close(context)
-}
-
-private[sql] class ParquetRelation2(
- override val paths: Array[String],
- private val maybeDataSchema: Option[StructType],
- // This is for metastore conversion.
- private val maybePartitionSpec: Option[PartitionSpec],
- override val userDefinedPartitionColumns: Option[StructType],
- parameters: Map[String, String])(
- val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec)
- with Logging {
-
- private[sql] def this(
- paths: Array[String],
- maybeDataSchema: Option[StructType],
- maybePartitionSpec: Option[PartitionSpec],
- parameters: Map[String, String])(
- sqlContext: SQLContext) = {
- this(
- paths,
- maybeDataSchema,
- maybePartitionSpec,
- maybePartitionSpec.map(_.partitionColumns),
- parameters)(sqlContext)
- }
-
- // Should we merge schemas from all Parquet part-files?
- private val shouldMergeSchemas =
- parameters
- .get(ParquetRelation2.MERGE_SCHEMA)
- .map(_.toBoolean)
- .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
-
- private val maybeMetastoreSchema = parameters
- .get(ParquetRelation2.METASTORE_SCHEMA)
- .map(DataType.fromJson(_).asInstanceOf[StructType])
-
- private lazy val metadataCache: MetadataCache = {
- val meta = new MetadataCache
- meta.refresh()
- meta
- }
-
- override def equals(other: Any): Boolean = other match {
- case that: ParquetRelation2 =>
- val schemaEquality = if (shouldMergeSchemas) {
- this.shouldMergeSchemas == that.shouldMergeSchemas
- } else {
- this.dataSchema == that.dataSchema &&
- this.schema == that.schema
- }
-
- this.paths.toSet == that.paths.toSet &&
- schemaEquality &&
- this.maybeDataSchema == that.maybeDataSchema &&
- this.partitionColumns == that.partitionColumns
-
- case _ => false
- }
-
- override def hashCode(): Int = {
- if (shouldMergeSchemas) {
- Objects.hashCode(
- Boolean.box(shouldMergeSchemas),
- paths.toSet,
- maybeDataSchema,
- partitionColumns)
- } else {
- Objects.hashCode(
- Boolean.box(shouldMergeSchemas),
- paths.toSet,
- dataSchema,
- schema,
- maybeDataSchema,
- partitionColumns)
- }
- }
-
- /** Constraints on schema of dataframe to be stored. */
- private def checkConstraints(schema: StructType): Unit = {
- if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
- val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
- case (x, ys) if ys.length > 1 => "\"" + x + "\""
- }.mkString(", ")
- throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
- s"cannot save to parquet format")
- }
- }
-
- override def dataSchema: StructType = {
- val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
- // check if schema satisfies the constraints
- // before moving forward
- checkConstraints(schema)
- schema
- }
-
- override private[sql] def refresh(): Unit = {
- super.refresh()
- metadataCache.refresh()
- }
-
- // Parquet data source always uses Catalyst internal representations.
- override val needConversion: Boolean = false
-
- override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
-
- override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- val conf = ContextUtil.getConfiguration(job)
-
- val committerClass =
- conf.getClass(
- SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
- classOf[ParquetOutputCommitter],
- classOf[ParquetOutputCommitter])
-
- if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
- logInfo("Using default output committer for Parquet: " +
- classOf[ParquetOutputCommitter].getCanonicalName)
- } else {
- logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
- }
-
- conf.setClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key,
- committerClass,
- classOf[ParquetOutputCommitter])
-
- // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
- // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
- // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
- // bundled with `ParquetOutputFormat[Row]`.
- job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
-
- // TODO There's no need to use two kinds of WriteSupport
- // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
- // complex types.
- val writeSupportClass =
- if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
- classOf[MutableRowWriteSupport]
- } else {
- classOf[RowWriteSupport]
- }
-
- ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
- RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
-
- // Sets compression scheme
- conf.set(
- ParquetOutputFormat.COMPRESSION,
- ParquetRelation
- .shortParquetCompressionCodecNames
- .getOrElse(
- sqlContext.conf.parquetCompressionCodec.toUpperCase,
- CompressionCodecName.UNCOMPRESSED).name())
-
- new OutputWriterFactory {
- override def newInstance(
- path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
- new ParquetOutputWriter(path, context)
- }
- }
- }
-
- override def buildScan(
- requiredColumns: Array[String],
- filters: Array[Filter],
- inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
- val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
- val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
- val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
- val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
- val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
-
- // Create the function to set variable Parquet confs at both driver and executor side.
- val initLocalJobFuncOpt =
- ParquetRelation2.initializeLocalJobFunc(
- requiredColumns,
- filters,
- dataSchema,
- useMetadataCache,
- parquetFilterPushDown,
- assumeBinaryIsString,
- assumeInt96IsTimestamp,
- followParquetFormatSpec) _
-
- // Create the function to set input paths at the driver side.
- val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
-
- Utils.withDummyCallSite(sqlContext.sparkContext) {
- new SqlNewHadoopRDD(
- sc = sqlContext.sparkContext,
- broadcastedConf = broadcastedConf,
- initDriverSideJobFuncOpt = Some(setInputPaths),
- initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
- inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
- keyClass = classOf[Void],
- valueClass = classOf[InternalRow]) {
-
- val cacheMetadata = useMetadataCache
-
- @transient val cachedStatuses = inputFiles.map { f =>
- // In order to encode the authority of a Path containing special characters such as '/'
- // (which does happen in some S3N credentials), we need to use the string returned by the
- // URI of the path to create a new Path.
- val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
- new FileStatus(
- f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
- f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
- }.toSeq
-
- private def escapePathUserInfo(path: Path): Path = {
- val uri = path.toUri
- new Path(new URI(
- uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath,
- uri.getQuery, uri.getFragment))
- }
-
- // Overridden so we can inject our own cached files statuses.
- override def getPartitions: Array[SparkPartition] = {
- val inputFormat = new ParquetInputFormat[InternalRow] {
- override def listStatus(jobContext: JobContext): JList[FileStatus] = {
- if (cacheMetadata) cachedStatuses else super.listStatus(jobContext)
- }
- }
-
- val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
- val rawSplits = inputFormat.getSplits(jobContext)
-
- Array.tabulate[SparkPartition](rawSplits.size) { i =>
- new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
- }
- }
- }.values.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row]
- }
- }
-
- private class MetadataCache {
- // `FileStatus` objects of all "_metadata" files.
- private var metadataStatuses: Array[FileStatus] = _
-
- // `FileStatus` objects of all "_common_metadata" files.
- private var commonMetadataStatuses: Array[FileStatus] = _
-
- // `FileStatus` objects of all data files (Parquet part-files).
- var dataStatuses: Array[FileStatus] = _
-
- // Schema of the actual Parquet files, without partition columns discovered from partition
- // directory paths.
- var dataSchema: StructType = null
-
- // Schema of the whole table, including partition columns.
- var schema: StructType = _
-
- // Cached leaves
- var cachedLeaves: Set[FileStatus] = null
-
- /**
- * Refreshes `FileStatus`es, footers, partition spec, and table schema.
- */
- def refresh(): Unit = {
- val currentLeafStatuses = cachedLeafStatuses()
-
- // Check if cachedLeafStatuses is changed or not
- val leafStatusesChanged = (cachedLeaves == null) ||
- !cachedLeaves.equals(currentLeafStatuses)
-
- if (leafStatusesChanged) {
- cachedLeaves = currentLeafStatuses.toIterator.toSet
-
- // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
- val leaves = currentLeafStatuses.filter { f =>
- isSummaryFile(f.getPath) ||
- !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
- }.toArray
-
- dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
- metadataStatuses =
- leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
- commonMetadataStatuses =
- leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
- dataSchema = {
- val dataSchema0 = maybeDataSchema
- .orElse(readSchema())
- .orElse(maybeMetastoreSchema)
- .getOrElse(throw new AnalysisException(
- s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
- paths.mkString("\n\t")))
-
- // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
- // case insensitivity issue and possible schema mismatch (probably caused by schema
- // evolution).
- maybeMetastoreSchema
- .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
- .getOrElse(dataSchema0)
- }
- }
- }
-
- private def isSummaryFile(file: Path): Boolean = {
- file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
- file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
- }
-
- private def readSchema(): Option[StructType] = {
- // Sees which file(s) we need to touch in order to figure out the schema.
- //
- // Always tries the summary files first if users don't require a merged schema. In this case,
- // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
- // groups information, and could be much smaller for large Parquet files with lots of row
- // groups. If no summary file is available, falls back to some random part-file.
- //
- // NOTE: Metadata stored in the summary files are merged from all part-files. However, for
- // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
- // how to merge them correctly if some key is associated with different values in different
- // part-files. When this happens, Parquet simply gives up generating the summary file. This
- // implies that if a summary file presents, then:
- //
- // 1. Either all part-files have exactly the same Spark SQL schema, or
- // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
- // their schemas may differ from each other).
- //
- // Here we tend to be pessimistic and take the second case into account. Basically this means
- // we can't trust the summary files if users require a merged schema, and must touch all part-
- // files to do the merge.
- val filesToTouch =
- if (shouldMergeSchemas) {
- // Also includes summary files, 'cause there might be empty partition directories.
- (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
- } else {
- // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
- // don't have this.
- commonMetadataStatuses.headOption
- // Falls back to "_metadata"
- .orElse(metadataStatuses.headOption)
- // Summary file(s) not found, the Parquet file is either corrupted, or different part-
- // files contain conflicting user defined metadata (two or more values are associated
- // with a same key in different files). In either case, we fall back to any of the
- // first part-file, and just assume all schemas are consistent.
- .orElse(dataStatuses.headOption)
- .toSeq
- }
-
- assert(
- filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
- "No predefined schema found, " +
- s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
-
- ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
- }
- }
-}
-
-private[sql] object ParquetRelation2 extends Logging {
- // Whether we should merge schemas collected from all Parquet part-files.
- private[sql] val MERGE_SCHEMA = "mergeSchema"
-
- // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
- // internally.
- private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
- /** This closure sets various Parquet configurations at both driver side and executor side. */
- private[parquet] def initializeLocalJobFunc(
- requiredColumns: Array[String],
- filters: Array[Filter],
- dataSchema: StructType,
- useMetadataCache: Boolean,
- parquetFilterPushDown: Boolean,
- assumeBinaryIsString: Boolean,
- assumeInt96IsTimestamp: Boolean,
- followParquetFormatSpec: Boolean)(job: Job): Unit = {
- val conf = job.getConfiguration
- conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName)
-
- // Try to push down filters when filter push-down is enabled.
- if (parquetFilterPushDown) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(dataSchema, _))
- .reduceOption(FilterApi.and)
- .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
- }
-
- conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
- val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
- ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
- })
-
- conf.set(
- RowWriteSupport.SPARK_ROW_SCHEMA,
- ParquetTypesConverter.convertToString(dataSchema.toAttributes))
-
- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
-
- // Sets flags for Parquet schema conversion
- conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
- conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
- conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
- }
-
- /** This closure sets input paths at the driver side. */
- private[parquet] def initializeDriverSideJobFunc(
- inputFiles: Array[FileStatus])(job: Job): Unit = {
- // We side the input paths at the driver side.
- logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
- if (inputFiles.nonEmpty) {
- FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
- }
- }
-
- private[parquet] def readSchema(
- footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
-
- def parseParquetSchema(schema: MessageType): StructType = {
- StructType.fromAttributes(
- // TODO Really no need to use `Attribute` here, we only need to know the data type.
- ParquetTypesConverter.convertToAttributes(
- schema,
- sqlContext.conf.isParquetBinaryAsString,
- sqlContext.conf.isParquetINT96AsTimestamp))
- }
-
- val seen = mutable.HashSet[String]()
- val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
- val metadata = footer.getParquetMetadata.getFileMetaData
- val serializedSchema = metadata
- .getKeyValueMetaData
- .toMap
- .get(RowReadSupport.SPARK_METADATA_KEY)
- if (serializedSchema.isEmpty) {
- // Falls back to Parquet schema if no Spark SQL schema found.
- Some(parseParquetSchema(metadata.getSchema))
- } else if (!seen.contains(serializedSchema.get)) {
- seen += serializedSchema.get
-
- // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
- // whatever is available.
- Some(Try(DataType.fromJson(serializedSchema.get))
- .recover { case _: Throwable =>
- logInfo(
- s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
- "falling back to the deprecated DataType.fromCaseClassString parser.")
- DataType.fromCaseClassString(serializedSchema.get)
- }
- .recover { case cause: Throwable =>
- logWarning(
- s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
- |\t$serializedSchema
- """.stripMargin,
- cause)
- }
- .map(_.asInstanceOf[StructType])
- .getOrElse {
- // Falls back to Parquet schema if Spark SQL schema can't be parsed.
- parseParquetSchema(metadata.getSchema)
- })
- } else {
- None
- }
- }
-
- finalSchemas.reduceOption { (left, right) =>
- try left.merge(right) catch { case e: Throwable =>
- throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
- }
- }
- }
-
- /**
- * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
- * schema and Parquet schema.
- *
- * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
- * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
- * distinguish binary and string). This method generates a correct schema by merging Metastore
- * schema data types and Parquet schema field names.
- */
- private[parquet] def mergeMetastoreParquetSchema(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- def schemaConflictMessage: String =
- s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
- |${metastoreSchema.prettyJson}
- |
- |Parquet schema:
- |${parquetSchema.prettyJson}
- """.stripMargin
-
- val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
- assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
- val ordinalMap = metastoreSchema.zipWithIndex.map {
- case (field, index) => field.name.toLowerCase -> index
- }.toMap
-
- val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
- ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
- StructType(metastoreSchema.zip(reorderedParquetSchema).map {
- // Uses Parquet field names but retains Metastore data types.
- case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
- mSchema.copy(name = pSchema.name)
- case _ =>
- throw new SparkException(schemaConflictMessage)
- })
- }
-
- /**
- * Returns the original schema from the Parquet file with any missing nullable fields from the
- * Hive Metastore schema merged in.
- *
- * When constructing a DataFrame from a collection of structured data, the resulting object has
- * a schema corresponding to the union of the fields present in each element of the collection.
- * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
- * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
- * contain a particular nullable field in its schema despite that field being present in the
- * table schema obtained from the Hive Metastore. This method returns a schema representing the
- * Parquet file schema along with any additional nullable fields from the Metastore schema
- * merged in.
- */
- private[parquet] def mergeMissingNullableFields(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
- val missingFields = metastoreSchema
- .map(_.name.toLowerCase)
- .diff(parquetSchema.map(_.name.toLowerCase))
- .map(fieldMap(_))
- .filter(_.nullable)
- StructType(parquetSchema ++ missingFields)
- }
-
- /**
- * Figures out a merged Parquet schema with a distributed Spark job.
- *
- * Note that locality is not taken into consideration here because:
- *
- * 1. For a single Parquet part-file, in most cases the footer only resides in the last block of
- * that file. Thus we only need to retrieve the location of the last block. However, Hadoop
- * `FileSystem` only provides API to retrieve locations of all blocks, which can be
- * potentially expensive.
- *
- * 2. This optimization is mainly useful for S3, where file metadata operations can be pretty
- * slow. And basically locality is not available when using S3 (you can't run computation on
- * S3 nodes).
- */
- def mergeSchemasInParallel(
- filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
- val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
- val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
- val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
- val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
-
- // HACK ALERT:
- //
- // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
- // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
- // but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well
- // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These
- // facts virtually prevents us to serialize `FileStatus`es.
- //
- // Since Parquet only relies on path and length information of those `FileStatus`es to read
- // footers, here we just extract them (which can be easily serialized), send them to executor
- // side, and resemble fake `FileStatus`es there.
- val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
-
- // Issues a Spark job to read Parquet schema in parallel.
- val partiallyMergedSchemas =
- sqlContext
- .sparkContext
- .parallelize(partialFileStatusInfo)
- .mapPartitions { iterator =>
- // Resembles fake `FileStatus`es with serialized path and length information.
- val fakeFileStatuses = iterator.map { case (path, length) =>
- new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
- }.toSeq
-
- // Skips row group information since we only need the schema
- val skipRowGroups = true
-
- // Reads footers in multi-threaded manner within each task
- val footers =
- ParquetFileReader.readAllFootersInParallel(
- serializedConf.value, fakeFileStatuses, skipRowGroups)
-
- // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
- val converter =
- new CatalystSchemaConverter(
- assumeBinaryIsString = assumeBinaryIsString,
- assumeInt96IsTimestamp = assumeInt96IsTimestamp,
- followParquetFormatSpec = followParquetFormatSpec)
-
- footers.map { footer =>
- ParquetRelation2.readSchemaFromFooter(footer, converter)
- }.reduceOption(_ merge _).iterator
- }.collect()
-
- partiallyMergedSchemas.reduceOption(_ merge _)
- }
-
- /**
- * Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string
- * can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns
- * a [[StructType]] converted from the [[MessageType]] stored in this footer.
- */
- def readSchemaFromFooter(
- footer: Footer, converter: CatalystSchemaConverter): StructType = {
- val fileMetaData = footer.getParquetMetadata.getFileMetaData
- fileMetaData
- .getKeyValueMetaData
- .toMap
- .get(RowReadSupport.SPARK_METADATA_KEY)
- .flatMap(deserializeSchemaString)
- .getOrElse(converter.convert(fileMetaData.getSchema))
- }
-
- private def deserializeSchemaString(schemaString: String): Option[StructType] = {
- // Tries to deserialize the schema string as JSON first, then falls back to the case class
- // string parser (data generated by older versions of Spark SQL uses this format).
- Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
- case _: Throwable =>
- logInfo(
- s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
- "falling back to the deprecated DataType.fromCaseClassString parser.")
- DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
- }.recoverWith {
- case cause: Throwable =>
- logWarning(
- "Failed to parse and ignored serialized Spark schema in " +
- s"Parquet key-value metadata:\n\t$schemaString", cause)
- Failure(cause)
- }.toOption
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 23df102cd9..b6a7c4fbdd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.parquet
-import org.scalatest.BeforeAndAfterAll
import org.apache.parquet.filter2.predicate.Operators._
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
@@ -40,7 +39,7 @@ import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
* 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to ensure the inferred
* data type is nullable.
*/
-class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
+class ParquetFilterSuite extends QueryTest with ParquetTest {
lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
private def checkFilterPredicate(
@@ -56,17 +55,9 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
- val maybeAnalyzedPredicate = {
- val forParquetTableScan = query.queryExecution.executedPlan.collect {
- case plan: ParquetTableScan => plan.columnPruningPred
- }.flatten.reduceOption(_ && _)
-
- val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
- }.flatten.reduceOption(_ && _)
-
- forParquetTableScan.orElse(forParquetDataSource)
- }
+ val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+ case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters
+ }.flatten.reduceOption(_ && _)
assert(maybeAnalyzedPredicate.isDefined)
maybeAnalyzedPredicate.foreach { pred =>
@@ -98,7 +89,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
(implicit df: DataFrame): Unit = {
def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = {
- assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
+ assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) {
df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
}
}
@@ -308,18 +299,6 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
'_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
}
}
-}
-
-class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeAndAfterAll {
- lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
- override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
- }
-
- override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
test("SPARK-6554: don't push down predicates which reference partition columns") {
import sqlContext.implicits._
@@ -338,37 +317,3 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA
}
}
}
-
-class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with BeforeAndAfterAll {
- lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
- override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- }
-
- override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
-
- test("SPARK-6742: don't push down predicates which reference partition columns") {
- import sqlContext.implicits._
-
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
- withTempPath { dir =>
- val path = s"${dir.getCanonicalPath}/part=1"
- (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
-
- // If the "part = 1" filter gets pushed down, this query will throw an exception since
- // "part" is not a valid column in the actual Parquet file
- val df = DataFrame(sqlContext, org.apache.spark.sql.parquet.ParquetRelation(
- path,
- Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
- Seq(AttributeReference("part", IntegerType, false)()) ))
-
- checkAnswer(
- df.filter("a = 1 or part = 1"),
- (1 to 3).map(i => Row(1, i, i.toString)))
- }
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 3a5b860484..b5314a3dd9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -32,7 +32,6 @@ import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, P
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
-import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkException
import org.apache.spark.sql._
@@ -63,7 +62,7 @@ private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteS
/**
* A test suite that tests basic Parquet I/O.
*/
-class ParquetIOSuiteBase extends QueryTest with ParquetTest {
+class ParquetIOSuite extends QueryTest with ParquetTest {
lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
import sqlContext.implicits._
@@ -357,7 +356,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
""".stripMargin)
withTempPath { location =>
- val extraMetadata = Map(RowReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
+ val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
@@ -422,26 +421,6 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}
}
-}
-
-class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
- extends ParquetOutputCommitter(outputPath, context) {
-
- override def commitJob(jobContext: JobContext): Unit = {
- sys.error("Intentional exception for testing purposes")
- }
-}
-
-class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
- private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
- override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
- }
-
- override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key, originalConf.toString)
- }
test("SPARK-6330 regression test") {
// In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
@@ -456,14 +435,10 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
}
}
-class ParquetDataSourceOffIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
- private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
- override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- }
+class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+ extends ParquetOutputCommitter(outputPath, context) {
- override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
+ override def commitJob(jobContext: JobContext): Unit = {
+ sys.error("Intentional exception for testing purposes")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 7f16b1125c..2eef10189f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -467,7 +467,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: ParquetRelation2) =>
+ case LogicalRelation(relation: ParquetRelation) =>
assert(relation.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 21007d95ed..c037faf4cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.parquet
import org.apache.hadoop.fs.Path
-import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
@@ -26,7 +25,7 @@ import org.apache.spark.sql.{QueryTest, Row, SQLConf}
/**
* A test suite that tests various Parquet queries.
*/
-class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
+class ParquetQuerySuite extends QueryTest with ParquetTest {
lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
import sqlContext.sql
@@ -164,27 +163,3 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
}
}
}
-
-class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {
- private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
- override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
- }
-
- override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
-}
-
-class ParquetDataSourceOffQuerySuite extends ParquetQuerySuiteBase with BeforeAndAfterAll {
- private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
-
- override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- }
-
- override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index fa62939267..4a0b3b60f4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -378,7 +378,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ ParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
@@ -393,7 +393,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ ParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),
@@ -404,7 +404,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ ParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
@@ -415,7 +415,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Conflicting non-nullable field names
intercept[Throwable] {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ ParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
@@ -429,7 +429,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ ParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
@@ -442,7 +442,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ ParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 4cdb83c511..1b8edefef4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -444,9 +444,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
HiveDDLStrategy,
DDLStrategy,
TakeOrderedAndProject,
- ParquetOperations,
InMemoryScans,
- ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0a2121c955..2629235312 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.Warehouse
@@ -30,7 +29,6 @@ import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.Logging
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
@@ -39,10 +37,11 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
@@ -260,8 +259,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
- ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
- ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+ ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@@ -272,7 +271,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
+ case logical@LogicalRelation(parquetRelation: ParquetRelation) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
@@ -317,7 +316,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new ParquetRelation2(
+ new ParquetRelation(
paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -330,7 +329,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
+ new ParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
@@ -370,8 +369,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
- *
- * This rule can be considered as [[HiveStrategies.ParquetConversion]] done right.
*/
object ParquetConversions extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
@@ -386,7 +383,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
- conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -397,7 +393,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
- conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -406,7 +401,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
- conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index a22c3292ef..cd6cd322c9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,23 +17,14 @@
package org.apache.spark.sql.hive
-import scala.collection.JavaConversions._
-
-import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.types.StringType
private[hive] trait HiveStrategies {
@@ -42,136 +33,6 @@ private[hive] trait HiveStrategies {
val hiveContext: HiveContext
- /**
- * :: Experimental ::
- * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
- * table scan operator.
- *
- * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
- * but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
- *
- * Other issues:
- * - Much of this logic assumes case insensitive resolution.
- */
- @Experimental
- object ParquetConversion extends Strategy {
- implicit class LogicalPlanHacks(s: DataFrame) {
- def lowerCase: DataFrame = DataFrame(s.sqlContext, s.logicalPlan)
-
- def addPartitioningAttributes(attrs: Seq[Attribute]): DataFrame = {
- // Don't add the partitioning key if its already present in the data.
- if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
- s
- } else {
- DataFrame(
- s.sqlContext,
- s.logicalPlan transform {
- case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
- })
- }
- }
- }
-
- implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
- def fakeOutput(newOutput: Seq[Attribute]): OutputFaker =
- OutputFaker(
- originalPlan.output.map(a =>
- newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
- .getOrElse(
- sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
- originalPlan)
- }
-
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
- if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
- hiveContext.convertMetastoreParquet &&
- !hiveContext.conf.parquetUseDataSourceApi =>
-
- // Filter out all predicates that only deal with partition keys
- val partitionsKeys = AttributeSet(relation.partitionKeys)
- val (pruningPredicates, otherPredicates) = predicates.partition {
- _.references.subsetOf(partitionsKeys)
- }
-
- // We are going to throw the predicates and projection back at the whole optimization
- // sequence so lets unresolve all the attributes, allowing them to be rebound to the
- // matching parquet attributes.
- val unresolvedOtherPredicates = Column(otherPredicates.map(_ transform {
- case a: AttributeReference => UnresolvedAttribute(a.name)
- }).reduceOption(And).getOrElse(Literal(true)))
-
- val unresolvedProjection: Seq[Column] = projectList.map(_ transform {
- case a: AttributeReference => UnresolvedAttribute(a.name)
- }).map(Column(_))
-
- try {
- if (relation.hiveQlTable.isPartitioned) {
- val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
- // Translate the predicate so that it automatically casts the input values to the
- // correct data types during evaluation.
- val castedPredicate = rawPredicate transform {
- case a: AttributeReference =>
- val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
- val key = relation.partitionKeys(idx)
- Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
- }
-
- val inputData = new GenericMutableRow(relation.partitionKeys.size)
- val pruningCondition =
- if (codegenEnabled) {
- GeneratePredicate.generate(castedPredicate)
- } else {
- InterpretedPredicate.create(castedPredicate)
- }
-
- val partitions = relation.getHiveQlPartitions(pruningPredicates).filter { part =>
- val partitionValues = part.getValues
- var i = 0
- while (i < partitionValues.size()) {
- inputData(i) = CatalystTypeConverters.convertToCatalyst(partitionValues(i))
- i += 1
- }
- pruningCondition(inputData)
- }
-
- val partitionLocations = partitions.map(_.getLocation)
-
- if (partitionLocations.isEmpty) {
- PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: Nil
- } else {
- hiveContext
- .read.parquet(partitionLocations: _*)
- .addPartitioningAttributes(relation.partitionKeys)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection: _*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)) :: Nil
- }
-
- } else {
- hiveContext
- .read.parquet(relation.hiveQlTable.getDataLocation.toString)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection: _*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)) :: Nil
- }
- } catch {
- // parquetFile will throw an exception when there is no data.
- // TODO: Remove this hack for Spark 1.3.
- case iae: java.lang.IllegalArgumentException
- if iae.getMessage.contains("Can not create a Path from an empty string") =>
- PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: Nil
- }
- case _ => Nil
- }
- }
-
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index af68615e8e..a45c2d9572 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetTest
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql.{QueryTest, Row}
case class Cases(lower: String, UPPER: String)
@@ -28,64 +28,54 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
import sqlContext._
- def run(prefix: String): Unit = {
- test(s"$prefix: Case insensitive attribute names") {
- withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") {
- val expected = (1 to 4).map(i => Row(i.toString))
- checkAnswer(sql("SELECT upper FROM cases"), expected)
- checkAnswer(sql("SELECT LOWER FROM cases"), expected)
- }
+ test("Case insensitive attribute names") {
+ withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") {
+ val expected = (1 to 4).map(i => Row(i.toString))
+ checkAnswer(sql("SELECT upper FROM cases"), expected)
+ checkAnswer(sql("SELECT LOWER FROM cases"), expected)
}
+ }
- test(s"$prefix: SELECT on Parquet table") {
- val data = (1 to 4).map(i => (i, s"val_$i"))
- withParquetTable(data, "t") {
- checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
- }
+ test("SELECT on Parquet table") {
+ val data = (1 to 4).map(i => (i, s"val_$i"))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
}
+ }
- test(s"$prefix: Simple column projection + filter on Parquet table") {
- withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
- checkAnswer(
- sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
- Seq(Row(true, "val_2"), Row(true, "val_4")))
- }
+ test("Simple column projection + filter on Parquet table") {
+ withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
+ checkAnswer(
+ sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+ Seq(Row(true, "val_2"), Row(true, "val_4")))
}
+ }
- test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") {
- withTempPath { dir =>
- sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
- read.parquet(dir.getCanonicalPath).registerTempTable("p")
- withTempTable("p") {
- checkAnswer(
- sql("SELECT * FROM src ORDER BY key"),
- sql("SELECT * from p ORDER BY key").collect().toSeq)
- }
+ test("Converting Hive to Parquet Table via saveAsParquetFile") {
+ withTempPath { dir =>
+ sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
+ read.parquet(dir.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ checkAnswer(
+ sql("SELECT * FROM src ORDER BY key"),
+ sql("SELECT * from p ORDER BY key").collect().toSeq)
}
}
+ }
- test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") {
- withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
- withTempPath { file =>
- sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
- read.parquet(file.getCanonicalPath).registerTempTable("p")
- withTempTable("p") {
- // let's do three overwrites for good measure
- sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
- sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
- sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
- checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq)
- }
+ test("INSERT OVERWRITE TABLE Parquet table") {
+ withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
+ withTempPath { file =>
+ sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
+ read.parquet(file.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ // let's do three overwrites for good measure
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq)
}
}
}
}
-
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
- run("Parquet data source enabled")
- }
-
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "false") {
- run("Parquet data source disabled")
- }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e403f32efa..4fdf774ead 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -21,10 +21,9 @@ import java.io.File
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.BeforeAndAfterAll
-
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.InvalidInputException
+import org.scalatest.BeforeAndAfterAll
import org.apache.spark.Logging
import org.apache.spark.sql._
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -564,10 +563,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
}
test("scan a parquet table created through a CTAS statement") {
- withSQLConf(
- HiveContext.CONVERT_METASTORE_PARQUET.key -> "true",
- SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
-
+ withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "true") {
withTempTable("jt") {
(1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
@@ -582,9 +578,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
Row(3) :: Row(4) :: Nil)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation2) => // OK
+ case LogicalRelation(p: ParquetRelation) => // OK
case _ =>
- fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
+ fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}")
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 0342826542..ff42fdefaa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
case class Nested1(f1: Nested2)
@@ -61,7 +62,9 @@ class MyDialect extends DefaultParserDialect
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
* valid, but Hive currently cannot execute it.
*/
-class SQLQuerySuite extends QueryTest {
+class SQLQuerySuite extends QueryTest with SQLTestUtils {
+ override def sqlContext: SQLContext = TestHive
+
test("SPARK-6835: udtf in lateral view") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.registerTempTable("table1")
@@ -195,17 +198,17 @@ class SQLQuerySuite extends QueryTest {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
- case LogicalRelation(r: ParquetRelation2) =>
+ case LogicalRelation(r: ParquetRelation) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
- s"${ParquetRelation2.getClass.getCanonicalName}.")
+ s"${ParquetRelation.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
- s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
+ s"${ParquetRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
@@ -350,33 +353,26 @@ class SQLQuerySuite extends QueryTest {
"serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22", "MANAGED_TABLE"
)
- val origUseParquetDataSource = conf.parquetUseDataSourceApi
- try {
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- sql(
- """CREATE TABLE ctas5
- | STORED AS parquet AS
- | SELECT key, value
- | FROM src
- | ORDER BY key, value""".stripMargin).collect()
-
- checkExistence(sql("DESC EXTENDED ctas5"), true,
- "name:key", "type:string", "name:value", "ctas5",
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
- "MANAGED_TABLE"
- )
-
- val default = convertMetastoreParquet
- // use the Hive SerDe for parquet tables
- sql("set spark.sql.hive.convertMetastoreParquet = false")
+ sql(
+ """CREATE TABLE ctas5
+ | STORED AS parquet AS
+ | SELECT key, value
+ | FROM src
+ | ORDER BY key, value""".stripMargin).collect()
+
+ checkExistence(sql("DESC EXTENDED ctas5"), true,
+ "name:key", "type:string", "name:value", "ctas5",
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
+ "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
+ "MANAGED_TABLE"
+ )
+
+ // use the Hive SerDe for parquet tables
+ withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
checkAnswer(
sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
- sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
- } finally {
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 82a8daf8b4..f56fb96c52 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -22,13 +22,13 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
+import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -57,7 +57,7 @@ case class ParquetDataWithKeyAndComplexTypes(
* A suite to test the automatic conversion of metastore tables with parquet data to use the
* built in parquet support.
*/
-class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
+class ParquetMetastoreSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -134,6 +134,19 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
""")
+ sql(
+ """
+ |create table test_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}
@@ -166,6 +179,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql("DROP TABLE normal_parquet")
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS jt_array")
+ sql("DROP TABLE IF EXISTS test_parquet")
setConf(HiveContext.CONVERT_METASTORE_PARQUET, false)
}
@@ -176,40 +190,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
- case _: ParquetTableScan => true
case _: PhysicalRDD => true
}.nonEmpty)
}
-}
-
-class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
-
- sql(
- """
- |create table test_parquet
- |(
- | intField INT,
- | stringField STRING
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- sql("DROP TABLE IF EXISTS test_parquet")
-
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
test("scan an empty parquet table") {
checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
@@ -292,10 +275,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(_: ParquetRelation2) => // OK
+ case LogicalRelation(_: ParquetRelation) => // OK
case _ => fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[ParquetRelation2].getCanonicalName}")
+ s"${classOf[ParquetRelation].getCanonicalName}")
}
sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -316,9 +299,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[ParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
s"However, found a ${o.toString} ")
}
@@ -346,9 +329,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[ParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}
@@ -379,17 +362,17 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
assertResult(2) {
analyzed.collect {
- case r @ LogicalRelation(_: ParquetRelation2) => r
+ case r @ LogicalRelation(_: ParquetRelation) => r
}.size
}
sql("DROP TABLE ms_convert")
}
- def collectParquetRelation(df: DataFrame): ParquetRelation2 = {
+ def collectParquetRelation(df: DataFrame): ParquetRelation = {
val plan = df.queryExecution.analyzed
plan.collectFirst {
- case LogicalRelation(r: ParquetRelation2) => r
+ case LogicalRelation(r: ParquetRelation) => r
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$plan")
}
@@ -439,7 +422,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
@@ -543,81 +526,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
}
}
-class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
-
- test("MetastoreRelation in InsertIntoTable will not be converted") {
- sql(
- """
- |create table test_insert_parquet
- |(
- | intField INT
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
- df.queryExecution.executedPlan match {
- case insert: execution.InsertIntoHiveTable => // OK
- case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
- s"However, found ${o.toString}.")
- }
-
- checkAnswer(
- sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
- sql("SELECT a FROM jt WHERE jt.a > 5").collect()
- )
-
- sql("DROP TABLE IF EXISTS test_insert_parquet")
- }
-
- // TODO: enable it after the fix of SPARK-5950.
- ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
- sql(
- """
- |create table test_insert_parquet
- |(
- | int_array array<int>
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
- df.queryExecution.executedPlan match {
- case insert: execution.InsertIntoHiveTable => // OK
- case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
- s"However, found ${o.toString}.")
- }
-
- checkAnswer(
- sql("SELECT int_array FROM test_insert_parquet"),
- sql("SELECT a FROM jt_array").collect()
- )
-
- sql("DROP TABLE IF EXISTS test_insert_parquet")
- }
-}
-
/**
* A suite of tests for the Parquet support through the data sources API.
*/
-class ParquetSourceSuiteBase extends ParquetPartitioningTest {
+class ParquetSourceSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -712,20 +624,6 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
}
}
}
-}
-
-class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
test("values in arrays and maps stored in parquet are always nullable") {
val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
@@ -734,7 +632,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
val expectedSchema1 =
StructType(
StructField("m", mapType1, nullable = true) ::
- StructField("a", arrayType1, nullable = true) :: Nil)
+ StructField("a", arrayType1, nullable = true) :: Nil)
assert(df.schema === expectedSchema1)
df.write.format("parquet").saveAsTable("alwaysNullable")
@@ -772,20 +670,6 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
}
}
-class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
-}
-
/**
* A collection of tests for parquet data with various forms of partitioning.
*/