aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-13 11:04:10 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-13 11:04:10 -0700
commit7ff16e8abef9fbf4a4855e23c256b22e62e560a6 (patch)
tree1be1249ecb9db02ef5bf8820f7c44a7fbe71a6ff /sql
parentbec938f777a2e18757c7d04504d86a5342e2b49e (diff)
downloadspark-7ff16e8abef9fbf4a4855e23c256b22e62e560a6.tar.gz
spark-7ff16e8abef9fbf4a4855e23c256b22e62e560a6.tar.bz2
spark-7ff16e8abef9fbf4a4855e23c256b22e62e560a6.zip
[SPARK-7567] [SQL] Migrating Parquet data source to FSBasedRelation
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are: 1. Partition discovery code has been factored out to `FSBasedRelation` 1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions 1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition 1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #6090 from liancheng/parquet-migration and squashes the following commits: 6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist 261d8c1 [Cheng Lian] Minor bug fix and more tests db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala278
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala565
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala840
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala25
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala35
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala)173
15 files changed, 920 insertions, 1090 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 975498c11f..0a148c7cd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -27,9 +27,11 @@ import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
import com.google.common.reflect.TypeToken
+import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
@@ -42,6 +44,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, e
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -641,7 +644,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
if (paths.isEmpty) {
emptyDataFrame
} else if (conf.parquetUseDataSourceApi) {
- baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
+ val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
+ baseRelationToDataFrame(
+ new FSBasedParquetRelation(
+ globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 5eb1c6abc2..f0f4e7d147 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -29,128 +29,184 @@ import parquet.io.api.Binary
import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = {
- filterExpressions.flatMap(createFilter).reduceOption(FilterApi.and).map(FilterCompat.get)
+ filterExpressions.flatMap { filter =>
+ createFilter(filter)
+ }.reduceOption(FilterApi.and).map(FilterCompat.get)
}
- def createFilter(predicate: Expression): Option[FilterPredicate] = {
- val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case BooleanType =>
- (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
- (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
- case LongType =>
- (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
- (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
- (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
-
- // Binary.fromString and Binary.fromByteArray don't accept null values
- case StringType =>
- (n: String, v: Any) => FilterApi.eq(
- binaryColumn(n),
- Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
- case BinaryType =>
- (n: String, v: Any) => FilterApi.eq(
- binaryColumn(n),
- Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- }
+ private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+ case BooleanType =>
+ (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
+ case IntegerType =>
+ (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
+ case LongType =>
+ (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ case FloatType =>
+ (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ case DoubleType =>
+ (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case BooleanType =>
- (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
- (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
- case LongType =>
- (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
- (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
- (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
- (n: String, v: Any) => FilterApi.notEq(
- binaryColumn(n),
- Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
- case BinaryType =>
- (n: String, v: Any) => FilterApi.notEq(
- binaryColumn(n),
- Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- }
+ // Binary.fromString and Binary.fromByteArray don't accept null values
+ case StringType =>
+ (n: String, v: Any) => FilterApi.eq(
+ binaryColumn(n),
+ Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
+ case BinaryType =>
+ (n: String, v: Any) => FilterApi.eq(
+ binaryColumn(n),
+ Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ }
- val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
- case LongType =>
- (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
- (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
- (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
- (n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
- case BinaryType =>
- (n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- }
+ private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+ case BooleanType =>
+ (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
+ case IntegerType =>
+ (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
+ case LongType =>
+ (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ case FloatType =>
+ (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ case DoubleType =>
+ (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ case StringType =>
+ (n: String, v: Any) => FilterApi.notEq(
+ binaryColumn(n),
+ Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[UTF8String].getBytes)).orNull)
+ case BinaryType =>
+ (n: String, v: Any) => FilterApi.notEq(
+ binaryColumn(n),
+ Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ }
- val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
- case LongType =>
- (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
- (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
- (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
- (n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
- case BinaryType =>
- (n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- }
+ private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+ case IntegerType =>
+ (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
+ case LongType =>
+ (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
+ case FloatType =>
+ (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ case DoubleType =>
+ (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ case StringType =>
+ (n: String, v: Any) =>
+ FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+ case BinaryType =>
+ (n: String, v: Any) =>
+ FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ }
- val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
- case LongType =>
- (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
- (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
- (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
- (n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
- case BinaryType =>
- (n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- }
+ private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+ case IntegerType =>
+ (n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
+ case LongType =>
+ (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ case FloatType =>
+ (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ case DoubleType =>
+ (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ case StringType =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+ case BinaryType =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ }
- val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
- case LongType =>
- (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
- case FloatType =>
- (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
- case DoubleType =>
- (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- case StringType =>
- (n: String, v: Any) =>
- FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
- case BinaryType =>
- (n: String, v: Any) =>
- FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+ case IntegerType =>
+ (n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
+ case LongType =>
+ (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
+ case FloatType =>
+ (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ case DoubleType =>
+ (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ case StringType =>
+ (n: String, v: Any) =>
+ FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+ case BinaryType =>
+ (n: String, v: Any) =>
+ FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ }
+
+ private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
+ case IntegerType =>
+ (n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
+ case LongType =>
+ (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
+ case FloatType =>
+ (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
+ case DoubleType =>
+ (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ case StringType =>
+ (n: String, v: Any) =>
+ FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[UTF8String].getBytes))
+ case BinaryType =>
+ (n: String, v: Any) =>
+ FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ }
+
+ /**
+ * Converts data sources filters to Parquet filter predicates.
+ */
+ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ // NOTE:
+ //
+ // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
+ // which can be casted to `false` implicitly. Please refer to the `eval` method of these
+ // operators and the `SimplifyFilters` rule for details.
+ predicate match {
+ case sources.IsNull(name) =>
+ makeEq.lift(dataTypeOf(name)).map(_(name, null))
+ case sources.IsNotNull(name) =>
+ makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
+
+ case sources.EqualTo(name, value) =>
+ makeEq.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
+
+ case sources.LessThan(name, value) =>
+ makeLt.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.LessThanOrEqual(name, value) =>
+ makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
+
+ case sources.GreaterThan(name, value) =>
+ makeGt.lift(dataTypeOf(name)).map(_(name, value))
+ case sources.GreaterThanOrEqual(name, value) =>
+ makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+
+ case sources.And(lhs, rhs) =>
+ (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)
+
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(schema, lhs)
+ rhsFilter <- createFilter(schema, rhs)
+ } yield FilterApi.or(lhsFilter, rhsFilter)
+
+ case sources.Not(pred) =>
+ createFilter(schema, pred).map(FilterApi.not)
+
+ case _ => None
}
+ }
+ /**
+ * Converts Catalyst predicate expressions to Parquet filter predicates.
+ *
+ * @todo This can be removed once we get rid of the old Parquet support.
+ */
+ def createFilter(predicate: Expression): Option[FilterPredicate] = {
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -170,7 +226,7 @@ private[sql] object ParquetFilters {
makeEq.lift(dataType).map(_(name, value))
case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
makeEq.lift(dataType).map(_(name, value))
-
+
case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) =>
makeNotEq.lift(dataType).map(_(name, value))
case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) =>
@@ -192,7 +248,7 @@ private[sql] object ParquetFilters {
case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
makeLtEq.lift(dataType).map(_(name, value))
case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeLtEq.lift(dataType).map(_(name, value))
+ makeLtEq.lift(dataType).map(_(name, value))
case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
makeGtEq.lift(dataType).map(_(name, value))
case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
@@ -201,7 +257,7 @@ private[sql] object ParquetFilters {
case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
makeGt.lift(dataType).map(_(name, value))
case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeGt.lift(dataType).map(_(name, value))
+ makeGt.lift(dataType).map(_(name, value))
case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
makeLt.lift(dataType).map(_(name, value))
case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
@@ -210,7 +266,7 @@ private[sql] object ParquetFilters {
case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
makeGtEq.lift(dataType).map(_(name, value))
case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
- makeGtEq.lift(dataType).map(_(name, value))
+ makeGtEq.lift(dataType).map(_(name, value))
case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
makeLtEq.lift(dataType).map(_(name, value))
case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 75ac52d4a9..90950f924a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -674,7 +674,7 @@ private[parquet] object FileSystemHelper {
def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
val files = FileSystemHelper.listFiles(pathStr, conf)
// filename pattern is part-r-<int>.parquet
- val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
+ val nameP = new scala.util.matching.Regex("""part-.-(\d{1,}).*""", "taskid")
val hiddenFileP = new scala.util.matching.Regex("_.*")
files.map(_.getName).map {
case nameP(taskid) => taskid.toInt
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
new file mode 100644
index 0000000000..d810d6a028
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import parquet.filter2.predicate.FilterApi
+import parquet.format.converter.ParquetMetadataConverter
+import parquet.hadoop._
+import parquet.hadoop.metadata.CompressionCodecName
+import parquet.hadoop.util.ContextUtil
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
+
+private[sql] class DefaultSource extends FSBasedRelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ paths: Array[String],
+ schema: Option[StructType],
+ partitionColumns: Option[StructType],
+ parameters: Map[String, String]): FSBasedRelation = {
+ val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
+ new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
+ }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[sql] class ParquetOutputWriter extends OutputWriter {
+ private var recordWriter: RecordWriter[Void, Row] = _
+ private var taskAttemptContext: TaskAttemptContext = _
+
+ override def init(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): Unit = {
+ val conf = context.getConfiguration
+ val outputFormat = {
+ // When appending new Parquet files to an existing Parquet file directory, to avoid
+ // overwriting existing data files, we need to find out the max task ID encoded in these data
+ // file names.
+ // TODO Make this snippet a utility function for other data source developers
+ val maxExistingTaskId = {
+ // Note that `path` may point to a temporary location. Here we retrieve the real
+ // destination path from the configuration
+ val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
+ val fs = outputPath.getFileSystem(conf)
+
+ if (fs.exists(outputPath)) {
+ // Pattern used to match task ID in part file names, e.g.:
+ //
+ // part-r-00001.gz.part
+ // ^~~~~
+ val partFilePattern = """part-.-(\d{1,}).*""".r
+
+ fs.listStatus(outputPath).map(_.getPath.getName).map {
+ case partFilePattern(id) => id.toInt
+ case name if name.startsWith("_") => 0
+ case name if name.startsWith(".") => 0
+ case name => sys.error(
+ s"""Trying to write Parquet files to directory $outputPath,
+ |but found items with illegal name "$name"
+ """.stripMargin.replace('\n', ' ').trim)
+ }.reduceOption(_ max _).getOrElse(0)
+ } else {
+ 0
+ }
+ }
+
+ new ParquetOutputFormat[Row]() {
+ // Here we override `getDefaultWorkFile` for two reasons:
+ //
+ // 1. To allow appending. We need to generate output file name based on the max available
+ // task ID computed above.
+ //
+ // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
+ // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
+ // partitions in the case of dynamic partitioning.
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
+ new Path(path, f"part-r-$split%05d$extension")
+ }
+ }
+ }
+
+ recordWriter = outputFormat.getRecordWriter(context)
+ taskAttemptContext = context
+ }
+
+ override def write(row: Row): Unit = recordWriter.write(null, row)
+
+ override def close(): Unit = recordWriter.close(taskAttemptContext)
+}
+
+private[sql] class FSBasedParquetRelation(
+ paths: Array[String],
+ private val maybeDataSchema: Option[StructType],
+ private val maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])(
+ val sqlContext: SQLContext)
+ extends FSBasedRelation(paths, maybePartitionSpec)
+ with Logging {
+
+ // Should we merge schemas from all Parquet part-files?
+ private val shouldMergeSchemas =
+ parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
+
+ private val maybeMetastoreSchema = parameters
+ .get(FSBasedParquetRelation.METASTORE_SCHEMA)
+ .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+ private val metadataCache = new MetadataCache
+ metadataCache.refresh()
+
+ override def equals(other: scala.Any): Boolean = other match {
+ case that: FSBasedParquetRelation =>
+ val schemaEquality = if (shouldMergeSchemas) {
+ this.shouldMergeSchemas == that.shouldMergeSchemas
+ } else {
+ this.dataSchema == that.dataSchema &&
+ this.schema == that.schema
+ }
+
+ this.paths.toSet == that.paths.toSet &&
+ schemaEquality &&
+ this.maybeDataSchema == that.maybeDataSchema &&
+ this.partitionColumns == that.partitionColumns
+
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ if (shouldMergeSchemas) {
+ Objects.hashCode(
+ Boolean.box(shouldMergeSchemas),
+ paths.toSet,
+ maybeDataSchema,
+ maybePartitionSpec)
+ } else {
+ Objects.hashCode(
+ Boolean.box(shouldMergeSchemas),
+ paths.toSet,
+ dataSchema,
+ schema,
+ maybeDataSchema,
+ maybePartitionSpec)
+ }
+ }
+
+ override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]
+
+ override def dataSchema: StructType = metadataCache.dataSchema
+
+ override private[sql] def refresh(): Unit = {
+ metadataCache.refresh()
+ super.refresh()
+ }
+
+ // Parquet data source always uses Catalyst internal representations.
+ override val needConversion: Boolean = false
+
+ override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
+
+ override def prepareForWrite(job: Job): Unit = {
+ val conf = ContextUtil.getConfiguration(job)
+
+ val committerClass =
+ conf.getClass(
+ "spark.sql.parquet.output.committer.class",
+ classOf[ParquetOutputCommitter],
+ classOf[ParquetOutputCommitter])
+
+ conf.setClass(
+ "mapred.output.committer.class",
+ committerClass,
+ classOf[ParquetOutputCommitter])
+
+ // TODO There's no need to use two kinds of WriteSupport
+ // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
+ // complex types.
+ val writeSupportClass =
+ if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
+ classOf[MutableRowWriteSupport]
+ } else {
+ classOf[RowWriteSupport]
+ }
+
+ ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
+ RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+
+ // Sets compression scheme
+ conf.set(
+ ParquetOutputFormat.COMPRESSION,
+ ParquetRelation
+ .shortParquetCompressionCodecNames
+ .getOrElse(
+ sqlContext.conf.parquetCompressionCodec.toUpperCase,
+ CompressionCodecName.UNCOMPRESSED).name())
+ }
+
+ override def buildScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputPaths: Array[String]): RDD[Row] = {
+
+ val job = Job.getInstance(SparkHadoopUtil.get.conf)
+ val conf = ContextUtil.getConfiguration(job)
+
+ ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+
+ if (inputPaths.nonEmpty) {
+ FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+ }
+
+ // Try to push down filters when filter push-down is enabled.
+ if (sqlContext.conf.parquetFilterPushDown) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .reduceOption(FilterApi.and)
+ .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
+ }
+
+ conf.set(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
+ val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
+ ParquetTypesConverter.convertToString(requestedSchema.toAttributes)
+ })
+
+ conf.set(
+ RowWriteSupport.SPARK_ROW_SCHEMA,
+ ParquetTypesConverter.convertToString(dataSchema.toAttributes))
+
+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+ val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+ conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+
+ val inputFileStatuses =
+ metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
+
+ val footers = inputFileStatuses.map(metadataCache.footers)
+
+ // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
+ // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
+ // footers. Especially when a global arbitrative schema (either from metastore or data source
+ // DDL) is available.
+ new NewHadoopRDD(
+ sqlContext.sparkContext,
+ classOf[FilteringParquetRowInputFormat],
+ classOf[Void],
+ classOf[Row],
+ conf) {
+
+ val cacheMetadata = useMetadataCache
+
+ @transient val cachedStatuses = inputFileStatuses.map { f =>
+ // In order to encode the authority of a Path containing special characters such as /,
+ // we need to use the string returned by the URI of the path to create a new Path.
+ val pathWithAuthority = new Path(f.getPath.toUri.toString)
+
+ new FileStatus(
+ f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+ f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+ }.toSeq
+
+ @transient val cachedFooters = footers.map { f =>
+ // In order to encode the authority of a Path containing special characters such as /,
+ // we need to use the string returned by the URI of the path to create a new Path.
+ new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+ }.toSeq
+
+ // Overridden so we can inject our own cached files statuses.
+ override def getPartitions: Array[SparkPartition] = {
+ val inputFormat = if (cacheMetadata) {
+ new FilteringParquetRowInputFormat {
+ override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
+
+ override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+ }
+ } else {
+ new FilteringParquetRowInputFormat
+ }
+
+ val jobContext = newJobContext(getConf, jobId)
+ val rawSplits = inputFormat.getSplits(jobContext)
+
+ Array.tabulate[SparkPartition](rawSplits.size) { i =>
+ new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ }
+ }.values
+ }
+
+ private class MetadataCache {
+ // `FileStatus` objects of all "_metadata" files.
+ private var metadataStatuses: Array[FileStatus] = _
+
+ // `FileStatus` objects of all "_common_metadata" files.
+ private var commonMetadataStatuses: Array[FileStatus] = _
+
+ // Parquet footer cache.
+ var footers: Map[FileStatus, Footer] = _
+
+ // `FileStatus` objects of all data files (Parquet part-files).
+ var dataStatuses: Array[FileStatus] = _
+
+ // Schema of the actual Parquet files, without partition columns discovered from partition
+ // directory paths.
+ var dataSchema: StructType = _
+
+ // Schema of the whole table, including partition columns.
+ var schema: StructType = _
+
+ /**
+ * Refreshes `FileStatus`es, footers, partition spec, and table schema.
+ */
+ def refresh(): Unit = {
+ // Support either reading a collection of raw Parquet part-files, or a collection of folders
+ // containing Parquet files (e.g. partitioned Parquet table).
+ val baseStatuses = paths.distinct.flatMap { p =>
+ val path = new Path(p)
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ Try(fs.getFileStatus(qualified)).toOption
+ }
+ assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
+
+ // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
+ val leaves = baseStatuses.flatMap { f =>
+ val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
+ SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
+ isSummaryFile(f.getPath) ||
+ !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+ }
+ }
+
+ dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+ metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
+ commonMetadataStatuses =
+ leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
+ footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
+ val parquetMetadata = ParquetFileReader.readFooter(
+ SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
+ f -> new Footer(f.getPath, parquetMetadata)
+ }.seq.toMap
+
+ dataSchema = {
+ val dataSchema0 =
+ maybeDataSchema
+ .orElse(readSchema())
+ .orElse(maybeMetastoreSchema)
+ .getOrElse(sys.error("Failed to get the schema."))
+
+ // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
+ // case insensitivity issue and possible schema mismatch (probably caused by schema
+ // evolution).
+ maybeMetastoreSchema
+ .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
+ .getOrElse(dataSchema0)
+ }
+ }
+
+ private def isSummaryFile(file: Path): Boolean = {
+ file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
+ file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
+ }
+
+ private def readSchema(): Option[StructType] = {
+ // Sees which file(s) we need to touch in order to figure out the schema.
+ //
+ // Always tries the summary files first if users don't require a merged schema. In this case,
+ // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
+ // groups information, and could be much smaller for large Parquet files with lots of row
+ // groups.
+ //
+ // NOTE: Metadata stored in the summary files are merged from all part-files. However, for
+ // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
+ // how to merge them correctly if some key is associated with different values in different
+ // part-files. When this happens, Parquet simply gives up generating the summary file. This
+ // implies that if a summary file presents, then:
+ //
+ // 1. Either all part-files have exactly the same Spark SQL schema, or
+ // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
+ // their schemas may differ from each other).
+ //
+ // Here we tend to be pessimistic and take the second case into account. Basically this means
+ // we can't trust the summary files if users require a merged schema, and must touch all part-
+ // files to do the merge.
+ val filesToTouch =
+ if (shouldMergeSchemas) {
+ // Also includes summary files, 'cause there might be empty partition directories.
+ (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+ } else {
+ // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
+ // don't have this.
+ commonMetadataStatuses.headOption
+ // Falls back to "_metadata"
+ .orElse(metadataStatuses.headOption)
+ // Summary file(s) not found, the Parquet file is either corrupted, or different part-
+ // files contain conflicting user defined metadata (two or more values are associated
+ // with a same key in different files). In either case, we fall back to any of the
+ // first part-file, and just assume all schemas are consistent.
+ .orElse(dataStatuses.headOption)
+ .toSeq
+ }
+
+ assert(
+ filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
+ "No schema defined, " +
+ s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
+
+ FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
+ }
+ }
+}
+
+private[sql] object FSBasedParquetRelation extends Logging {
+ // Whether we should merge schemas collected from all Parquet part-files.
+ private[sql] val MERGE_SCHEMA = "mergeSchema"
+
+ // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
+ // internally.
+ private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+
+ private[parquet] def readSchema(
+ footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
+ footers.map { footer =>
+ val metadata = footer.getParquetMetadata.getFileMetaData
+ val parquetSchema = metadata.getSchema
+ val maybeSparkSchema = metadata
+ .getKeyValueMetaData
+ .toMap
+ .get(RowReadSupport.SPARK_METADATA_KEY)
+ .flatMap { serializedSchema =>
+ // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+ // whatever is available.
+ Try(DataType.fromJson(serializedSchema))
+ .recover { case _: Throwable =>
+ logInfo(
+ s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "falling back to the deprecated DataType.fromCaseClassString parser.")
+ DataType.fromCaseClassString(serializedSchema)
+ }
+ .recover { case cause: Throwable =>
+ logWarning(
+ s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+ |\t$serializedSchema
+ """.stripMargin,
+ cause)
+ }
+ .map(_.asInstanceOf[StructType])
+ .toOption
+ }
+
+ maybeSparkSchema.getOrElse {
+ // Falls back to Parquet schema if Spark SQL schema is absent.
+ StructType.fromAttributes(
+ // TODO Really no need to use `Attribute` here, we only need to know the data type.
+ ParquetTypesConverter.convertToAttributes(
+ parquetSchema,
+ sqlContext.conf.isParquetBinaryAsString,
+ sqlContext.conf.isParquetINT96AsTimestamp))
+ }
+ }.reduceOption { (left, right) =>
+ try left.merge(right) catch { case e: Throwable =>
+ throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
+ }
+ }
+ }
+
+ /**
+ * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
+ * schema and Parquet schema.
+ *
+ * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
+ * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
+ * distinguish binary and string). This method generates a correct schema by merging Metastore
+ * schema data types and Parquet schema field names.
+ */
+ private[parquet] def mergeMetastoreParquetSchema(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ def schemaConflictMessage: String =
+ s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
+ |${metastoreSchema.prettyJson}
+ |
+ |Parquet schema:
+ |${parquetSchema.prettyJson}
+ """.stripMargin
+
+ val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
+
+ assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
+
+ val ordinalMap = metastoreSchema.zipWithIndex.map {
+ case (field, index) => field.name.toLowerCase -> index
+ }.toMap
+
+ val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
+ ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
+
+ StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+ // Uses Parquet field names but retains Metastore data types.
+ case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
+ mSchema.copy(name = pSchema.name)
+ case _ =>
+ throw new SparkException(schemaConflictMessage)
+ })
+ }
+
+ /**
+ * Returns the original schema from the Parquet file with any missing nullable fields from the
+ * Hive Metastore schema merged in.
+ *
+ * When constructing a DataFrame from a collection of structured data, the resulting object has
+ * a schema corresponding to the union of the fields present in each element of the collection.
+ * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
+ * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
+ * contain a particular nullable field in its schema despite that field being present in the
+ * table schema obtained from the Hive Metastore. This method returns a schema representing the
+ * Parquet file schema along with any additional nullable fields from the Metastore schema
+ * merged in.
+ */
+ private[parquet] def mergeMissingNullableFields(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
+ val missingFields = metastoreSchema
+ .map(_.name.toLowerCase)
+ .diff(parquetSchema.map(_.name.toLowerCase))
+ .map(fieldMap(_))
+ .filter(_.nullable)
+ StructType(parquetSchema ++ missingFields)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
deleted file mode 100644
index ee4b1c72a2..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ /dev/null
@@ -1,840 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.parquet
-
-import java.io.IOException
-import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
-import java.math.{BigDecimal => JBigDecimal}
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.{Date, List => JList}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
-import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.util.ContextUtil
-import parquet.hadoop.{ParquetInputFormat, _}
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, expressions}
-import org.apache.spark.sql.parquet.ParquetTypesConverter._
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType, _}
-import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
-import org.apache.spark.{Logging, SerializableWritable, SparkException, TaskContext, Partition => SparkPartition}
-
-/**
- * Allows creation of Parquet based tables using the syntax:
- * {{{
- * CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet OPTIONS (...)
- * }}}
- *
- * Supported options include:
- *
- * - `path`: Required. When reading Parquet files, `path` should point to the location of the
- * Parquet file(s). It can be either a single raw Parquet file, or a directory of Parquet files.
- * In the latter case, this data source tries to discover partitioning information if the the
- * directory is structured in the same style of Hive partitioned tables. When writing Parquet
- * file, `path` should point to the destination folder.
- *
- * - `mergeSchema`: Optional. Indicates whether we should merge potentially different (but
- * compatible) schemas stored in all Parquet part-files.
- *
- * - `partition.defaultName`: Optional. Partition name used when a value of a partition column is
- * null or empty string. This is similar to the `hive.exec.default.partition.name` configuration
- * in Hive.
- */
-private[sql] class DefaultSource
- extends RelationProvider
- with SchemaRelationProvider
- with CreatableRelationProvider {
-
- private def checkPath(parameters: Map[String, String]): String = {
- parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
- }
-
- /** Returns a new base relation with the given parameters. */
- override def createRelation(
- sqlContext: SQLContext,
- parameters: Map[String, String]): BaseRelation = {
- ParquetRelation2(Seq(checkPath(parameters)), parameters, None)(sqlContext)
- }
-
- /** Returns a new base relation with the given parameters and schema. */
- override def createRelation(
- sqlContext: SQLContext,
- parameters: Map[String, String],
- schema: StructType): BaseRelation = {
- ParquetRelation2(Seq(checkPath(parameters)), parameters, Some(schema))(sqlContext)
- }
-
- /** Returns a new base relation with the given parameters and save given data into it. */
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- val path = checkPath(parameters)
- val filesystemPath = new Path(path)
- val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val doInsertion = (mode, fs.exists(filesystemPath)) match {
- case (SaveMode.ErrorIfExists, true) =>
- sys.error(s"path $path already exists.")
- case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
- true
- case (SaveMode.Ignore, exists) =>
- !exists
- }
-
- val relation = if (doInsertion) {
- // This is a hack. We always set nullable/containsNull/valueContainsNull to true
- // for the schema of a parquet data.
- val df =
- sqlContext.createDataFrame(
- data.queryExecution.toRdd,
- data.schema.asNullable,
- needsConversion = false)
- val createdRelation =
- createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
- createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
- createdRelation
- } else {
- // If the save mode is Ignore, we will just create the relation based on existing data.
- createRelation(sqlContext, parameters)
- }
-
- relation
- }
-}
-
-/**
- * An alternative to [[ParquetRelation]] that plugs in using the data sources API. This class is
- * intended as a full replacement of the Parquet support in Spark SQL. The old implementation will
- * be deprecated and eventually removed once this version is proved to be stable enough.
- *
- * Compared with the old implementation, this class has the following notable differences:
- *
- * - Partitioning discovery: Hive style multi-level partitions are auto discovered.
- * - Metadata discovery: Parquet is a format comes with schema evolving support. This data source
- * can detect and merge schemas from all Parquet part-files as long as they are compatible.
- * Also, metadata and [[FileStatus]]es are cached for better performance.
- * - Statistics: Statistics for the size of the table are automatically populated during schema
- * discovery.
- */
-@DeveloperApi
-private[sql] case class ParquetRelation2(
- paths: Seq[String],
- parameters: Map[String, String],
- maybeSchema: Option[StructType] = None,
- maybePartitionSpec: Option[PartitionSpec] = None)(
- @transient val sqlContext: SQLContext)
- extends BaseRelation
- with CatalystScan
- with InsertableRelation
- with SparkHadoopMapReduceUtil
- with Logging {
-
- // Should we merge schemas from all Parquet part-files?
- private val shouldMergeSchemas =
- parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
-
- // Optional Metastore schema, used when converting Hive Metastore Parquet table
- private val maybeMetastoreSchema =
- parameters
- .get(ParquetRelation2.METASTORE_SCHEMA)
- .map(s => DataType.fromJson(s).asInstanceOf[StructType])
-
- // Hive uses this as part of the default partition name when the partition column value is null
- // or empty string
- private val defaultPartitionName = parameters.getOrElse(
- ParquetRelation2.DEFAULT_PARTITION_NAME, "__HIVE_DEFAULT_PARTITION__")
-
- override def equals(other: Any): Boolean = other match {
- case relation: ParquetRelation2 =>
- // If schema merging is required, we don't compare the actual schemas since they may evolve.
- val schemaEquality = if (shouldMergeSchemas) {
- shouldMergeSchemas == relation.shouldMergeSchemas
- } else {
- schema == relation.schema
- }
-
- paths.toSet == relation.paths.toSet &&
- schemaEquality &&
- maybeMetastoreSchema == relation.maybeMetastoreSchema &&
- maybePartitionSpec == relation.maybePartitionSpec
-
- case _ => false
- }
-
- override def hashCode(): Int = {
- if (shouldMergeSchemas) {
- com.google.common.base.Objects.hashCode(
- shouldMergeSchemas: java.lang.Boolean,
- paths.toSet,
- maybeMetastoreSchema,
- maybePartitionSpec)
- } else {
- com.google.common.base.Objects.hashCode(
- shouldMergeSchemas: java.lang.Boolean,
- schema,
- paths.toSet,
- maybeMetastoreSchema,
- maybePartitionSpec)
- }
- }
-
- private[sql] def sparkContext = sqlContext.sparkContext
-
- private class MetadataCache {
- // `FileStatus` objects of all "_metadata" files.
- private var metadataStatuses: Array[FileStatus] = _
-
- // `FileStatus` objects of all "_common_metadata" files.
- private var commonMetadataStatuses: Array[FileStatus] = _
-
- // Parquet footer cache.
- var footers: Map[FileStatus, Footer] = _
-
- // `FileStatus` objects of all data files (Parquet part-files).
- var dataStatuses: Array[FileStatus] = _
-
- // Partition spec of this table, including names, data types, and values of each partition
- // column, and paths of each partition.
- var partitionSpec: PartitionSpec = _
-
- // Schema of the actual Parquet files, without partition columns discovered from partition
- // directory paths.
- var parquetSchema: StructType = _
-
- // Schema of the whole table, including partition columns.
- var schema: StructType = _
-
- // Indicates whether partition columns are also included in Parquet data file schema. If not,
- // we need to fill in partition column values into read rows when scanning the table.
- var partitionKeysIncludedInParquetSchema: Boolean = _
-
- def prepareMetadata(path: Path, schema: StructType, conf: Configuration): Unit = {
- conf.set(
- ParquetOutputFormat.COMPRESSION,
- ParquetRelation
- .shortParquetCompressionCodecNames
- .getOrElse(
- sqlContext.conf.parquetCompressionCodec.toUpperCase,
- CompressionCodecName.UNCOMPRESSED).name())
-
- ParquetRelation.enableLogForwarding()
- ParquetTypesConverter.writeMetaData(schema.toAttributes, path, conf)
- }
-
- /**
- * Refreshes `FileStatus`es, footers, partition spec, and table schema.
- */
- def refresh(): Unit = {
- // Support either reading a collection of raw Parquet part-files, or a collection of folders
- // containing Parquet files (e.g. partitioned Parquet table).
- val baseStatuses = paths.distinct.map { p =>
- val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration)
- val path = new Path(p)
- val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-
- if (!fs.exists(qualified) && maybeSchema.isDefined) {
- fs.mkdirs(qualified)
- prepareMetadata(qualified, maybeSchema.get, sparkContext.hadoopConfiguration)
- }
-
- fs.getFileStatus(qualified)
- }.toArray
- assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
- // Lists `FileStatus`es of all leaf nodes (files) under all base directories.
- val leaves = baseStatuses.flatMap { f =>
- val fs = FileSystem.get(f.getPath.toUri, sparkContext.hadoopConfiguration)
- SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
- isSummaryFile(f.getPath) ||
- !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
- }
- }
-
- dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
- metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
- commonMetadataStatuses =
- leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
- footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
- val parquetMetadata = ParquetFileReader.readFooter(
- sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER)
- f -> new Footer(f.getPath, parquetMetadata)
- }.seq.toMap
-
- partitionSpec = maybePartitionSpec.getOrElse {
- val partitionDirs = leaves
- .filterNot(baseStatuses.contains)
- .map(_.getPath.getParent)
- .distinct
-
- if (partitionDirs.nonEmpty) {
- // Parses names and values of partition columns, and infer their data types.
- PartitioningUtils.parsePartitions(partitionDirs, defaultPartitionName)
- } else {
- // No partition directories found, makes an empty specification
- PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
- }
- }
-
- // To get the schema. We first try to get the schema defined in maybeSchema.
- // If maybeSchema is not defined, we will try to get the schema from existing parquet data
- // (through readSchema). If data does not exist, we will try to get the schema defined in
- // maybeMetastoreSchema (defined in the options of the data source).
- // Finally, if we still could not get the schema. We throw an error.
- parquetSchema =
- maybeSchema
- .orElse(readSchema())
- .orElse(maybeMetastoreSchema)
- .getOrElse(sys.error("Failed to get the schema."))
-
- partitionKeysIncludedInParquetSchema =
- isPartitioned &&
- partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name))
-
- schema = {
- val fullRelationSchema = if (partitionKeysIncludedInParquetSchema) {
- parquetSchema
- } else {
- StructType(parquetSchema.fields ++ partitionColumns.fields)
- }
-
- // If this Parquet relation is converted from a Hive Metastore table, must reconcile case
- // insensitivity issue and possible schema mismatch.
- maybeMetastoreSchema
- .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullRelationSchema))
- .getOrElse(fullRelationSchema)
- }
- }
-
- private def readSchema(): Option[StructType] = {
- // Sees which file(s) we need to touch in order to figure out the schema.
- val filesToTouch =
- // Always tries the summary files first if users don't require a merged schema. In this case,
- // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
- // groups information, and could be much smaller for large Parquet files with lots of row
- // groups.
- //
- // NOTE: Metadata stored in the summary files are merged from all part-files. However, for
- // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
- // how to merge them correctly if some key is associated with different values in different
- // part-files. When this happens, Parquet simply gives up generating the summary file. This
- // implies that if a summary file presents, then:
- //
- // 1. Either all part-files have exactly the same Spark SQL schema, or
- // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
- // their schemas may differ from each other).
- //
- // Here we tend to be pessimistic and take the second case into account. Basically this means
- // we can't trust the summary files if users require a merged schema, and must touch all part-
- // files to do the merge.
- if (shouldMergeSchemas) {
- // Also includes summary files, 'cause there might be empty partition directories.
- (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
- } else {
- // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
- // don't have this.
- commonMetadataStatuses.headOption
- // Falls back to "_metadata"
- .orElse(metadataStatuses.headOption)
- // Summary file(s) not found, the Parquet file is either corrupted, or different part-
- // files contain conflicting user defined metadata (two or more values are associated
- // with a same key in different files). In either case, we fall back to any of the
- // first part-file, and just assume all schemas are consistent.
- .orElse(dataStatuses.headOption)
- .toSeq
- }
-
- ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
- }
- }
-
- @transient private val metadataCache = new MetadataCache
- metadataCache.refresh()
-
- def partitionSpec: PartitionSpec = metadataCache.partitionSpec
-
- def partitionColumns: StructType = metadataCache.partitionSpec.partitionColumns
-
- def partitions: Seq[Partition] = metadataCache.partitionSpec.partitions
-
- def isPartitioned: Boolean = partitionColumns.nonEmpty
-
- private def partitionKeysIncludedInDataSchema = metadataCache.partitionKeysIncludedInParquetSchema
-
- private def parquetSchema = metadataCache.parquetSchema
-
- override def schema: StructType = metadataCache.schema
-
- private def isSummaryFile(file: Path): Boolean = {
- file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
- file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
- }
-
- // Skip type conversion
- override val needConversion: Boolean = false
-
- // TODO Should calculate per scan size
- // It's common that a query only scans a fraction of a large Parquet file. Returning size of the
- // whole Parquet file disables some optimizations in this case (e.g. broadcast join).
- override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
-
- // This is mostly a hack so that we can use the existing parquet filter code.
- override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
- val job = new Job(sparkContext.hadoopConfiguration)
- ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
- val jobConf: Configuration = ContextUtil.getConfiguration(job)
-
- val selectedPartitions = prunePartitions(predicates, partitions)
- val selectedFiles = if (isPartitioned) {
- selectedPartitions.flatMap { p =>
- metadataCache.dataStatuses.filter(_.getPath.getParent.toString == p.path)
- }
- } else {
- metadataCache.dataStatuses.toSeq
- }
- val selectedFooters = selectedFiles.map(metadataCache.footers)
-
- // FileInputFormat cannot handle empty lists.
- if (selectedFiles.nonEmpty) {
- // In order to encode the authority of a Path containning special characters such as /,
- // we need to use the string retruned by the URI of the path to create a new Path.
- val selectedPaths = selectedFiles.map(status => new Path(status.getPath.toUri.toString))
- FileInputFormat.setInputPaths(job, selectedPaths: _*)
- }
-
- // Try to push down filters when filter push-down is enabled.
- if (sqlContext.conf.parquetFilterPushDown) {
- val partitionColNames = partitionColumns.map(_.name).toSet
- predicates
- // Don't push down predicates which reference partition columns
- .filter { pred =>
- val referencedColNames = pred.references.map(_.name).toSet
- referencedColNames.intersect(partitionColNames).isEmpty
- }
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter)
- .reduceOption(FilterApi.and)
- .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
- }
-
- if (isPartitioned) {
- logInfo {
- val percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
- s"Reading $percentRead% of partitions"
- }
- }
-
- val requiredColumns = output.map(_.name)
- val requestedSchema = StructType(requiredColumns.map(schema(_)))
-
- // Store both requested and original schema in `Configuration`
- jobConf.set(
- RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
- convertToString(requestedSchema.toAttributes))
- jobConf.set(
- RowWriteSupport.SPARK_ROW_SCHEMA,
- convertToString(schema.toAttributes))
-
- // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- val useCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
- jobConf.set(SQLConf.PARQUET_CACHE_METADATA, useCache.toString)
-
- val baseRDD =
- new NewHadoopRDD(
- sparkContext,
- classOf[FilteringParquetRowInputFormat],
- classOf[Void],
- classOf[Row],
- jobConf) {
- val cacheMetadata = useCache
-
- @transient
- val cachedStatus = selectedFiles.map { st =>
- // In order to encode the authority of a Path containning special characters such as /,
- // we need to use the string retruned by the URI of the path to create a new Path.
- val newPath = new Path(st.getPath.toUri.toString)
-
- new FileStatus(
- st.getLen,
- st.isDir,
- st.getReplication,
- st.getBlockSize,
- st.getModificationTime,
- st.getAccessTime,
- st.getPermission,
- st.getOwner,
- st.getGroup,
- newPath)
- }
-
- @transient
- val cachedFooters = selectedFooters.map { f =>
- // In order to encode the authority of a Path containning special characters such as /,
- // we need to use the string retruned by the URI of the path to create a new Path.
- new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
- }
-
-
- // Overridden so we can inject our own cached files statuses.
- override def getPartitions: Array[SparkPartition] = {
- val inputFormat = if (cacheMetadata) {
- new FilteringParquetRowInputFormat {
- override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
-
- override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
- }
- } else {
- new FilteringParquetRowInputFormat
- }
-
- val jobContext = newJobContext(getConf, jobId)
- val rawSplits = inputFormat.getSplits(jobContext)
-
- Array.tabulate[SparkPartition](rawSplits.size) { i =>
- new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
- }
- }
- }
-
- // The ordinals for partition keys in the result row, if requested.
- val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
- case (name, index) => index -> requiredColumns.indexOf(name)
- }.toMap.filter {
- case (_, index) => index >= 0
- }
-
- // When the data does not include the key and the key is requested then we must fill it in
- // based on information from the input split.
- if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
- // This check is based on CatalystConverter.createRootConverter.
- val primitiveRow =
- requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
-
- baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
- val partValues = selectedPartitions.collectFirst {
- case p if split.getPath.getParent.toString == p.path =>
- CatalystTypeConverters.convertToCatalyst(p.values).asInstanceOf[Row]
- }.get
-
- val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
-
- if (primitiveRow) {
- iterator.map { pair =>
- // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
- val row = pair._2.asInstanceOf[SpecificMutableRow]
- var i = 0
- while (i < requiredPartOrdinal.size) {
- // TODO Avoids boxing cost here!
- val partOrdinal = requiredPartOrdinal(i)
- row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
- i += 1
- }
- row
- }
- } else {
- // Create a mutable row since we need to fill in values from partition columns.
- val mutableRow = new GenericMutableRow(requestedSchema.size)
- iterator.map { pair =>
- // We are using CatalystGroupConverter and it returns a GenericRow.
- // Since GenericRow is not mutable, we just cast it to a Row.
- val row = pair._2.asInstanceOf[Row]
- var i = 0
- while (i < row.size) {
- // TODO Avoids boxing cost here!
- mutableRow(i) = row(i)
- i += 1
- }
-
- i = 0
- while (i < requiredPartOrdinal.size) {
- // TODO Avoids boxing cost here!
- val partOrdinal = requiredPartOrdinal(i)
- mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
- i += 1
- }
- mutableRow
- }
- }
- }
- } else {
- baseRDD.map(_._2)
- }
- }
-
- private def prunePartitions(
- predicates: Seq[Expression],
- partitions: Seq[Partition]): Seq[Partition] = {
- val partitionColumnNames = partitionColumns.map(_.name).toSet
- val partitionPruningPredicates = predicates.filter {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
-
- val rawPredicate =
- partitionPruningPredicates.reduceOption(expressions.And).getOrElse(Literal(true))
- val boundPredicate = InterpretedPredicate.create(rawPredicate transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable = true)
- })
-
- if (isPartitioned && partitionPruningPredicates.nonEmpty) {
- partitions.filter(p => boundPredicate(p.values))
- } else {
- partitions
- }
- }
-
- override def insert(data: DataFrame, overwrite: Boolean): Unit = {
- assert(paths.size == 1, s"Can't write to multiple destinations: ${paths.mkString(",")}")
-
- // TODO: currently we do not check whether the "schema"s are compatible
- // That means if one first creates a table and then INSERTs data with
- // and incompatible schema the execution will fail. It would be nice
- // to catch this early one, maybe having the planner validate the schema
- // before calling execute().
-
- val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val writeSupport =
- if (parquetSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
- log.debug("Initializing MutableRowWriteSupport")
- classOf[MutableRowWriteSupport]
- } else {
- classOf[RowWriteSupport]
- }
-
- ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
-
- val conf = ContextUtil.getConfiguration(job)
- RowWriteSupport.setSchema(data.schema.toAttributes, conf)
-
- val destinationPath = new Path(paths.head)
-
- if (overwrite) {
- val fs = destinationPath.getFileSystem(conf)
- if (fs.exists(destinationPath)) {
- var success: Boolean = false
- try {
- success = fs.delete(destinationPath, true)
- } catch {
- case e: IOException =>
- throw new IOException(
- s"Unable to clear output directory ${destinationPath.toString} prior" +
- s" to writing to Parquet table:\n${e.toString}")
- }
- if (!success) {
- throw new IOException(
- s"Unable to clear output directory ${destinationPath.toString} prior" +
- s" to writing to Parquet table.")
- }
- }
- }
-
- job.setOutputKeyClass(classOf[Void])
- job.setOutputValueClass(classOf[Row])
- FileOutputFormat.setOutputPath(job, destinationPath)
-
- val wrappedConf = new SerializableWritable(job.getConfiguration)
- val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
- val stageId = sqlContext.sparkContext.newRddId()
-
- val taskIdOffset = if (overwrite) {
- 1
- } else {
- FileSystemHelper.findMaxTaskId(
- FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
- }
-
- def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
- /* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = newTaskAttemptID(
- jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
- val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
- val format = new AppendingParquetOutputFormat(taskIdOffset)
- val committer = format.getOutputCommitter(hadoopContext)
- committer.setupTask(hadoopContext)
- val writer = format.getRecordWriter(hadoopContext)
- try {
- while (iterator.hasNext) {
- val row = iterator.next()
- writer.write(null, row)
- }
- } finally {
- writer.close(hadoopContext)
- }
-
- SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
- }
- val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
- /* apparently we need a TaskAttemptID to construct an OutputCommitter;
- * however we're only going to use this local OutputCommitter for
- * setupJob/commitJob, so we just use a dummy "map" task.
- */
- val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
- val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
- val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-
- jobCommitter.setupJob(jobTaskContext)
- sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
- jobCommitter.commitJob(jobTaskContext)
-
- metadataCache.refresh()
- }
-}
-
-private[sql] object ParquetRelation2 extends Logging {
- // Whether we should merge schemas collected from all Parquet part-files.
- val MERGE_SCHEMA = "mergeSchema"
-
- // Default partition name to use when the partition column value is null or empty string.
- val DEFAULT_PARTITION_NAME = "partition.defaultName"
-
- // Hive Metastore schema, used when converting Metastore Parquet tables. This option is only used
- // internally.
- private[sql] val METASTORE_SCHEMA = "metastoreSchema"
-
- private[parquet] def readSchema(
- footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
- footers.map { footer =>
- val metadata = footer.getParquetMetadata.getFileMetaData
- val parquetSchema = metadata.getSchema
- val maybeSparkSchema = metadata
- .getKeyValueMetaData
- .toMap
- .get(RowReadSupport.SPARK_METADATA_KEY)
- .flatMap { serializedSchema =>
- // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
- // whatever is available.
- Try(DataType.fromJson(serializedSchema))
- .recover { case _: Throwable =>
- logInfo(
- s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
- "falling back to the deprecated DataType.fromCaseClassString parser.")
- DataType.fromCaseClassString(serializedSchema)
- }
- .recover { case cause: Throwable =>
- logWarning(
- s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
- |\t$serializedSchema
- """.stripMargin,
- cause)
- }
- .map(_.asInstanceOf[StructType])
- .toOption
- }
-
- maybeSparkSchema.getOrElse {
- // Falls back to Parquet schema if Spark SQL schema is absent.
- StructType.fromAttributes(
- // TODO Really no need to use `Attribute` here, we only need to know the data type.
- convertToAttributes(
- parquetSchema,
- sqlContext.conf.isParquetBinaryAsString,
- sqlContext.conf.isParquetINT96AsTimestamp))
- }
- }.reduceOption { (left, right) =>
- try left.merge(right) catch { case e: Throwable =>
- throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
- }
- }
- }
-
- /**
- * Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
- * schema and Parquet schema.
- *
- * Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
- * schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
- * distinguish binary and string). This method generates a correct schema by merging Metastore
- * schema data types and Parquet schema field names.
- */
- private[parquet] def mergeMetastoreParquetSchema(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- def schemaConflictMessage: String =
- s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
- |${metastoreSchema.prettyJson}
- |
- |Parquet schema:
- |${parquetSchema.prettyJson}
- """.stripMargin
-
- val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
-
- assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
-
- val ordinalMap = metastoreSchema.zipWithIndex.map {
- case (field, index) => field.name.toLowerCase -> index
- }.toMap
- val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
- ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
- StructType(metastoreSchema.zip(reorderedParquetSchema).map {
- // Uses Parquet field names but retains Metastore data types.
- case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
- mSchema.copy(name = pSchema.name)
- case _ =>
- throw new SparkException(schemaConflictMessage)
- })
- }
-
- /**
- * Returns the original schema from the Parquet file with any missing nullable fields from the
- * Hive Metastore schema merged in.
- *
- * When constructing a DataFrame from a collection of structured data, the resulting object has
- * a schema corresponding to the union of the fields present in each element of the collection.
- * Spark SQL simply assigns a null value to any field that isn't present for a particular row.
- * In some cases, it is possible that a given table partition stored as a Parquet file doesn't
- * contain a particular nullable field in its schema despite that field being present in the
- * table schema obtained from the Hive Metastore. This method returns a schema representing the
- * Parquet file schema along with any additional nullable fields from the Metastore schema
- * merged in.
- */
- private[parquet] def mergeMissingNullableFields(
- metastoreSchema: StructType,
- parquetSchema: StructType): StructType = {
- val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
- val missingFields = metastoreSchema
- .map(_.name.toLowerCase)
- .diff(parquetSchema.map(_.name.toLowerCase))
- .map(fieldMap(_))
- .filter(_.nullable)
- StructType(parquetSchema ++ missingFields)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index a294297677..7879328bba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -293,9 +293,18 @@ private[sql] abstract class BaseWriterContainer(
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- outputFormatClass.newInstance().getOutputCommitter(context)
+ val committerClass = context.getConfiguration.getClass(
+ "mapred.output.committer.class", null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ }.getOrElse {
+ outputFormatClass.newInstance().getOutputCommitter(context)
+ }
}
+
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
@@ -345,6 +354,7 @@ private[sql] class DefaultWriterContainer(
override protected def initWriters(): Unit = {
writer = outputWriterClass.newInstance()
+ taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
writer.init(getWorkPath, dataSchema, taskAttemptContext)
}
@@ -384,11 +394,14 @@ private[sql] class DynamicPartitionWriterContainer(
DynamicPartitionWriterContainer.escapePathName(string)
}
s"/$col=$valueString"
- }.mkString
+ }.mkString.stripPrefix(Path.SEPARATOR)
outputWriters.getOrElseUpdate(partitionPath, {
- val path = new Path(getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
+ val path = new Path(getWorkPath, partitionPath)
val writer = outputWriterClass.newInstance()
+ taskAttemptContext.getConfiguration.set(
+ "spark.sql.sources.output.path",
+ new Path(outputPath, partitionPath).toString)
writer.init(path.toString, dataSchema, taskAttemptContext)
writer
})
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 10d0ede4dc..3bbc5b0586 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}.flatten.reduceOption(_ && _)
val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
+ case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters
}.flatten.reduceOption(_ && _)
forParquetTableScan.orElse(forParquetDataSource)
@@ -350,7 +350,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
-
+
test("SPARK-6742: don't push down predicates which reference partition columns") {
import sqlContext.implicits._
@@ -365,7 +365,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
path,
Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
Seq(AttributeReference("part", IntegerType, false)()) ))
-
+
checkAnswer(
df.filter("a = 1 or part = 1"),
(1 to 3).map(i => Row(1, i, i.toString)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index b504842053..7c371dbc7d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -119,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
// Decimals with precision above 18 are not yet supported
- intercept[RuntimeException] {
+ intercept[Throwable] {
withTempPath { dir =>
makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
@@ -127,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
// Unlimited-length decimals are not yet supported
- intercept[RuntimeException] {
+ intercept[Throwable] {
withTempPath { dir =>
makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
@@ -419,7 +419,7 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
test("SPARK-6330 regression test") {
// In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
// IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
- intercept[java.io.FileNotFoundException] {
+ intercept[Throwable] {
sqlContext.parquetFile("file:///nonexistent")
}
val errorMessage = intercept[Throwable] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index bea568ed40..138e19766d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -39,7 +39,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
import sqlContext._
import sqlContext.implicits._
- val defaultPartitionName = "__NULL__"
+ val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
test("column type inference") {
def check(raw: String, literal: Literal): Unit = {
@@ -252,9 +252,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
val parquetRelation = load(
"org.apache.spark.sql.parquet",
- Map(
- "path" -> base.getCanonicalPath,
- ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
+ Map("path" -> base.getCanonicalPath))
parquetRelation.registerTempTable("t")
@@ -297,9 +295,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
val parquetRelation = load(
"org.apache.spark.sql.parquet",
- Map(
- "path" -> base.getCanonicalPath,
- ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
+ Map("path" -> base.getCanonicalPath))
parquetRelation.registerTempTable("t")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index c964b6d984..fc90e3edce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
@@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),
@@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
@@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Conflicting non-nullable field names
intercept[Throwable] {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
@@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
@@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
- ParquetRelation2.mergeMetastoreParquetSchema(
+ FSBasedParquetRelation.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d754c8e3a8..b0e82c8d03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
import org.apache.spark.util.Utils
/* Implicit conversions */
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
- ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
- ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+ FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
+ FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@@ -238,13 +238,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
+ case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
- parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+ parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
+ PartitionSpec(StructType(Nil), Array.empty[sources.Partition])
+ }
if (useCached) {
Some(logical)
@@ -256,7 +258,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
- s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
+ s"as Parquet. However, we are getting a $other from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
@@ -278,8 +280,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
- val created =
- LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+ val created = LogicalRelation(
+ new FSBasedParquetRelation(
+ paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
@@ -290,8 +293,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
- val created =
- LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+ val created = LogicalRelation(
+ new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 47c60f651d..da5d203d9d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -21,21 +21,18 @@ import java.io.File
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.BeforeAndAfterEach
-
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.mapred.InvalidInputException
+import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql._
-import org.apache.spark.util.Utils
-import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* Tests for persisting tables created though the data sources API into the metastore.
@@ -582,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation2) => // OK
+ case LogicalRelation(p: FSBasedParquetRelation) => // OK
case _ =>
fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[ParquetRelation2].getCanonicalName}")
+ s"${classOf[FSBasedParquetRelation].getCanonicalName}")
}
// Clenup and reset confs.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a5744ccc68..1d6393a3fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -19,16 +19,14 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.errors.DialectException
-import org.apache.spark.sql.DefaultParserDialect
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
-import org.apache.spark.sql.hive.MetastoreRelation
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim}
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf}
case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
@@ -176,17 +174,17 @@ class SQLQuerySuite extends QueryTest {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
- case LogicalRelation(r: ParquetRelation2) =>
+ case LogicalRelation(r: FSBasedParquetRelation) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
- s"${ParquetRelation2.getClass.getCanonicalName}.")
+ s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
- s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
+ s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
@@ -596,7 +594,7 @@ class SQLQuerySuite extends QueryTest {
sql(s"DROP TABLE $tableName")
}
}
-
+
test("SPARK-5203 union with different decimal precision") {
Seq.empty[(Decimal, Decimal)]
.toDF("d1", "d2")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index bf1121ddf0..41bcbe84b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,16 +21,15 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.{QueryTest, SQLConf}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
-import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
+import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
@@ -292,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation2) => // OK
- case _ =>
- fail(
- s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+ case LogicalRelation(_: FSBasedParquetRelation) => // OK
+ case _ => fail(
+ "test_parquet_ctas should be converted to " +
+ s"${classOf[FSBasedParquetRelation].getCanonicalName}")
}
sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -316,12 +315,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(
- InsertIntoDataSource(
- LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+ case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
- s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
+ s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+ s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
s"However, found a ${o.toString} ")
}
@@ -348,11 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(
- InsertIntoDataSource(
- LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+ case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}
@@ -383,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
assertResult(2) {
analyzed.collect {
- case r @ LogicalRelation(_: ParquetRelation2) => r
+ case r @ LogicalRelation(_: FSBasedParquetRelation) => r
}.size
}
@@ -395,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
+ case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
@@ -693,7 +688,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
- intercept[RuntimeException](df2.saveAsParquetFile(filePath))
+ intercept[Throwable](df2.saveAsParquetFile(filePath))
val df3 = df2.toDF("str", "max_int")
df3.saveAsParquetFile(filePath2)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
index e8b48a0db1..394833f229 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
@@ -28,12 +28,14 @@ import org.apache.spark.sql.types._
// TODO Don't extend ParquetTest
// This test suite extends ParquetTest for some convenient utility methods. These methods should be
// moved to some more general places, maybe QueryTest.
-class FSBasedRelationSuite extends QueryTest with ParquetTest {
+class FSBasedRelationTest extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestHive
import sqlContext._
import sqlContext.implicits._
+ val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+
val dataSchema =
StructType(
Seq(
@@ -92,17 +94,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempPath { file =>
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -114,17 +116,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempPath { file =>
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append)
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)).orderBy("a"),
@@ -137,7 +139,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[RuntimeException] {
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists)
}
}
@@ -147,7 +149,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore)
val path = new Path(file.getCanonicalPath)
@@ -159,62 +161,37 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - simple queries") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkQueries(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)))
}
}
- test("save()/load() - partitioned table - simple queries - partition columns in data") {
- withTempDir { file =>
- val basePath = new Path(file.getCanonicalPath)
- val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
- val qualifiedBasePath = fs.makeQualified(basePath)
-
- for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
- val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
- sparkContext
- .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
- .saveAsTextFile(partitionDir.toString)
- }
-
- val dataSchemaWithPartition =
- StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
-
- checkQueries(
- load(
- source = classOf[SimpleTextSource].getCanonicalName,
- options = Map(
- "path" -> file.getCanonicalPath,
- "dataSchema" -> dataSchemaWithPartition.json)))
- }
- }
-
test("save()/load() - partitioned table - Overwrite") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -225,20 +202,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Append") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -249,20 +226,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Append - new partition values") {
withTempPath { file =>
partitionedTestDF1.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF2.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -274,7 +251,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
intercept[RuntimeException] {
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
@@ -286,7 +263,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
partitionedTestDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore)
val path = new Path(file.getCanonicalPath)
@@ -302,7 +279,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - non-partitioned table - Overwrite") {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
@@ -314,12 +291,12 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - non-partitioned table - Append") {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append)
withTable("t") {
@@ -334,7 +311,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[AnalysisException] {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists)
}
}
@@ -346,7 +323,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempTable("t") {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore)
assert(table("t").collect().isEmpty)
@@ -356,7 +333,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - simple queries") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
@@ -368,14 +345,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Overwrite") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -388,14 +365,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -408,14 +385,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append - new partition values") {
partitionedTestDF1.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF2.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -428,7 +405,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
partitionedTestDF1.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -437,7 +414,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1"))
@@ -447,7 +424,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p2", "p1"))
@@ -461,7 +438,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[AnalysisException] {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -475,7 +452,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempTable("t") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -487,13 +464,13 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("Hadoop style globbing") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
val df = load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
"dataSchema" -> dataSchema.json))
@@ -521,3 +498,67 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
}
}
}
+
+class SimpleTextRelationSuite extends FSBasedRelationTest {
+ override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
+
+ import sqlContext._
+
+ test("save()/load() - partitioned table - simple queries - partition columns in data") {
+ withTempDir { file =>
+ val basePath = new Path(file.getCanonicalPath)
+ val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+ val qualifiedBasePath = fs.makeQualified(basePath)
+
+ for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+ val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ sparkContext
+ .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+ .saveAsTextFile(partitionDir.toString)
+ }
+
+ val dataSchemaWithPartition =
+ StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+ checkQueries(
+ load(
+ source = dataSourceName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchemaWithPartition.json)))
+ }
+ }
+}
+
+class FSBasedParquetRelationSuite extends FSBasedRelationTest {
+ override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
+
+ import sqlContext._
+ import sqlContext.implicits._
+
+ test("save()/load() - partitioned table - simple queries - partition columns in data") {
+ withTempDir { file =>
+ val basePath = new Path(file.getCanonicalPath)
+ val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+ val qualifiedBasePath = fs.makeQualified(basePath)
+
+ for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+ val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ sparkContext
+ .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
+ .toDF("a", "b", "p1")
+ .saveAsParquetFile(partitionDir.toString)
+ }
+
+ val dataSchemaWithPartition =
+ StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+ checkQueries(
+ load(
+ source = dataSourceName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchemaWithPartition.json)))
+ }
+ }
+}