aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala49
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala35
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala30
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala36
12 files changed, 211 insertions, 56 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c35b7eff82..32986aa3ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -49,8 +49,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
- val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
- new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
+ new ParquetRelation2(paths, schema, None, partitionColumns, parameters)(sqlContext)
}
}
@@ -118,12 +117,28 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
private[sql] class ParquetRelation2(
override val paths: Array[String],
private val maybeDataSchema: Option[StructType],
+ // This is for metastore conversion.
private val maybePartitionSpec: Option[PartitionSpec],
+ override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {
+ private[sql] def this(
+ paths: Array[String],
+ maybeDataSchema: Option[StructType],
+ maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])(
+ sqlContext: SQLContext) = {
+ this(
+ paths,
+ maybeDataSchema,
+ maybePartitionSpec,
+ maybePartitionSpec.map(_.partitionColumns),
+ parameters)(sqlContext)
+ }
+
// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
@@ -161,7 +176,7 @@ private[sql] class ParquetRelation2(
Boolean.box(shouldMergeSchemas),
paths.toSet,
maybeDataSchema,
- maybePartitionSpec)
+ partitionColumns)
} else {
Objects.hashCode(
Boolean.box(shouldMergeSchemas),
@@ -169,7 +184,7 @@ private[sql] class ParquetRelation2(
dataSchema,
schema,
maybeDataSchema,
- maybePartitionSpec)
+ partitionColumns)
}
}
@@ -185,9 +200,6 @@ private[sql] class ParquetRelation2(
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
- override def userDefinedPartitionColumns: Option[StructType] =
- maybePartitionSpec.map(_.partitionColumns)
-
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index d54dbb0831..498f7538d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -93,6 +93,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
job.setOutputValueClass(classOf[Row])
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
+ // We create a DataFrame by applying the schema of relation to the data to make sure.
+ // We are writing data based on the expected schema,
val df = sqlContext.createDataFrame(
DataFrame(sqlContext, query).queryExecution.toRdd,
relation.schema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index a13ab74852..5e723122ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.RunnableCommand
@@ -245,12 +245,13 @@ private[sql] object ResolvedDataSource {
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
}
- val dataSchema = StructType(schema.filterNot(f => partitionColumns.contains(f.name)))
+ val dataSchema =
+ StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
dataSource.createRelation(
sqlContext,
paths,
- Some(schema),
+ Some(dataSchema),
maybePartitionsSchema,
caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
@@ -320,10 +321,20 @@ private[sql] object ResolvedDataSource {
Some(dataSchema.asNullable),
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
+
+ // For partitioned relation r, r.schema's column ordering is different with the column
+ // ordering of data.logicalPlan. We need a Project to adjust the ordering.
+ // So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
+ // the data.
+ val project =
+ Project(
+ r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
+ data.logicalPlan)
+
sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
- data.logicalPlan,
+ project,
partitionColumns.toArray,
mode)).toRdd
r
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index fcbac0d457..61fc4e5c19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.SerializableWritable
-import org.apache.spark.sql._
+import org.apache.spark.sql.{Row, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.{StructField, StructType}
@@ -120,11 +120,13 @@ trait HadoopFsRelationProvider {
* Returns a new base relation with the given parameters, a user defined schema, and a list of
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
* is enforced by the Map that is passed to the function.
+ *
+ * @param dataSchema Schema of data columns (i.e., columns that are not partition columns).
*/
def createRelation(
sqlContext: SQLContext,
paths: Array[String],
- schema: Option[StructType],
+ dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation
}
@@ -416,8 +418,29 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
final private[sql] def partitionSpec: PartitionSpec = {
if (_partitionSpec == null) {
_partitionSpec = maybePartitionSpec
- .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
- .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
+ .flatMap {
+ case spec if spec.partitions.nonEmpty =>
+ Some(spec.copy(partitionColumns = spec.partitionColumns.asNullable))
+ case _ =>
+ None
+ }
+ .orElse {
+ // We only know the partition columns and their data types. We need to discover
+ // partition values.
+ userDefinedPartitionColumns.map { partitionSchema =>
+ val spec = discoverPartitions()
+ val castedPartitions = spec.partitions.map { case p @ Partition(values, path) =>
+ val literals = values.toSeq.zip(spec.partitionColumns.map(_.dataType)).map {
+ case (value, dataType) => Literal.create(value, dataType)
+ }
+ val castedValues = partitionSchema.zip(literals).map { case (field, literal) =>
+ Cast(literal, field.dataType).eval()
+ }
+ p.copy(values = Row.fromSeq(castedValues))
+ }
+ PartitionSpec(partitionSchema, castedPartitions)
+ }
+ }
.getOrElse {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
discoverPartitions()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 75d290625e..ca66cdc482 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -78,4 +78,11 @@ trait SQLTestUtils {
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
}
+
+ /**
+ * Drops table `tableName` after calling `f`.
+ */
+ protected def withTable(tableName: String)(f: => Unit): Unit = {
+ try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2aa80b47a9..5b6840008f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -66,11 +66,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def schemaStringFromParts: Option[String] = {
table.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
- val part = table.properties.get(s"spark.sql.sources.schema.part.${index}").orNull
+ val part = table.properties.get(s"spark.sql.sources.schema.part.$index").orNull
if (part == null) {
throw new AnalysisException(
- s"Could not read schema from the metastore because it is corrupted " +
- s"(missing part ${index} of the schema).")
+ "Could not read schema from the metastore because it is corrupted " +
+ s"(missing part $index of the schema, $numParts parts are expected).")
}
part
@@ -89,6 +89,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val userSpecifiedSchema =
schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
+ // We only need names at here since userSpecifiedSchema we loaded from the metastore
+ // contains partition columns. We can always get datatypes of partitioning columns
+ // from userSpecifiedSchema.
+ val partitionColumns = table.partitionColumns.map(_.name)
+
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.serdeProperties
@@ -97,7 +102,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
ResolvedDataSource(
hive,
userSpecifiedSchema,
- Array.empty[String],
+ partitionColumns.toArray,
table.properties("spark.sql.sources.provider"),
options)
@@ -111,8 +116,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
override def refreshTable(databaseName: String, tableName: String): Unit = {
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
- // Since we also cache ParquetRealtions converted from Hive Parquet tables and
- // adding converted ParquetRealtions into the cache is not defined in the load function
+ // Since we also cache ParquetRelations converted from Hive Parquet tables and
+ // adding converted ParquetRelations into the cache is not defined in the load function
// of the cache (instead, we add the cache entry in convertToParquetRelation),
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
@@ -133,12 +138,17 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
+ partitionColumns: Array[String],
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)
+
+ // Saves optional user specified schema. Serialized JSON schema string may be too long to be
+ // stored into a single metastore SerDe property. In this case, we split the JSON string and
+ // store each part as a separate SerDe property.
if (userSpecifiedSchema.isDefined) {
val threshold = conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
@@ -146,8 +156,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
- tableProperties.put(s"spark.sql.sources.schema.part.${index}", part)
+ tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
+ }
+ }
+
+ val metastorePartitionColumns = userSpecifiedSchema.map { schema =>
+ val fields = partitionColumns.map(col => schema(col))
+ fields.map { field =>
+ HiveColumn(
+ name = field.name,
+ hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
+ comment = "")
+ }.toSeq
+ }.getOrElse {
+ if (partitionColumns.length > 0) {
+ // The table does not have a specified schema, which means that the schema will be inferred
+ // when we load the table. So, we are not expecting partition columns and we will discover
+ // partitions when we load the table. However, if there are specified partition columns,
+ // we simplily ignore them and provide a warning message..
+ logWarning(
+ s"The schema and partitions of table $tableName will be inferred when it is loaded. " +
+ s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}
+ Seq.empty[HiveColumn]
}
val tableType = if (isExternal) {
@@ -163,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
specifiedDatabase = Option(dbName),
name = tblName,
schema = Seq.empty,
- partitionColumns = Seq.empty,
+ partitionColumns = metastorePartitionColumns,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options))
@@ -199,7 +230,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val dataSourceTable =
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
- // Othersie, wrap the table with a Subquery using the table name.
+ // Otherwise, wrap the table with a Subquery using the table name.
val withAlias =
alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
Subquery(tableIdent.last, dataSourceTable))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6609763343..0ba94d7b7c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -146,6 +146,7 @@ case class CreateMetastoreDataSource(
hiveContext.catalog.createDataSourceTable(
tableName,
userSpecifiedSchema,
+ Array.empty[String],
provider,
optionsWithPath,
isExternal)
@@ -244,6 +245,7 @@ case class CreateMetastoreDataSourceAsSelect(
hiveContext.catalog.createDataSourceTable(
tableName,
Some(resolved.relation.schema),
+ partitionColumns,
provider,
optionsWithPath,
isExternal)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index b69e14a179..f03c4cd54e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -48,15 +48,14 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
def createRelation(
sqlContext: SQLContext,
paths: Array[String],
- schema: Option[StructType],
+ dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
assert(
sqlContext.isInstanceOf[HiveContext],
"The ORC data source can only be used with HiveContext.")
- val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty[Partition]))
- OrcRelation(paths, parameters, schema, partitionSpec)(sqlContext)
+ new OrcRelation(paths, dataSchema, None, partitionColumns, parameters)(sqlContext)
}
}
@@ -136,23 +135,35 @@ private[orc] class OrcOutputWriter(
}
@DeveloperApi
-private[sql] case class OrcRelation(
+private[sql] class OrcRelation(
override val paths: Array[String],
- parameters: Map[String, String],
- maybeSchema: Option[StructType] = None,
- maybePartitionSpec: Option[PartitionSpec] = None)(
+ maybeDataSchema: Option[StructType],
+ maybePartitionSpec: Option[PartitionSpec],
+ override val userDefinedPartitionColumns: Option[StructType],
+ parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {
- override val dataSchema: StructType = maybeSchema.getOrElse {
+ private[sql] def this(
+ paths: Array[String],
+ maybeDataSchema: Option[StructType],
+ maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])(
+ sqlContext: SQLContext) = {
+ this(
+ paths,
+ maybeDataSchema,
+ maybePartitionSpec,
+ maybePartitionSpec.map(_.partitionColumns),
+ parameters)(sqlContext)
+ }
+
+ override val dataSchema: StructType = maybeDataSchema.getOrElse {
OrcFileOperator.readSchema(
paths.head, Some(sqlContext.sparkContext.hadoopConfiguration))
}
- override def userDefinedPartitionColumns: Option[StructType] =
- maybePartitionSpec.map(_.partitionColumns)
-
override def needConversion: Boolean = false
override def equals(other: Any): Boolean = other match {
@@ -169,7 +180,7 @@ private[sql] case class OrcRelation(
paths.toSet,
dataSchema,
schema,
- maybePartitionSpec)
+ partitionColumns)
}
override def buildScan(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 30db976a3a..c4c7b63496 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -670,6 +670,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
catalog.createDataSourceTable(
tableName = "wide_schema",
userSpecifiedSchema = Some(schema),
+ partitionColumns = Array.empty[String],
provider = "json",
options = Map("path" -> "just a dummy path"),
isExternal = false)
@@ -705,6 +706,35 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql(s"drop table $tableName")
}
+ test("Saving partition columns information") {
+ val df =
+ sparkContext.parallelize(1 to 10, 4).map { i =>
+ Tuple4(i, i + 1, s"str$i", s"str${i + 1}")
+ }.toDF("a", "b", "c", "d")
+
+ val tableName = s"partitionInfo_${System.currentTimeMillis()}"
+ df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
+ invalidateTable(tableName)
+ val metastoreTable = catalog.client.getTable("default", tableName)
+ val expectedPartitionColumns =
+ StructType(df.schema("d") :: df.schema("b") :: Nil)
+ val actualPartitionColumns =
+ StructType(
+ metastoreTable.partitionColumns.map(c =>
+ StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType))))
+ // Make sure partition columns are correctly stored in metastore.
+ assert(
+ expectedPartitionColumns.sameType(actualPartitionColumns),
+ s"Partitions columns stored in metastore $actualPartitionColumns is not the " +
+ s"partition columns defined by the saveAsTable operation $expectedPartitionColumns.")
+
+ // Check the content of the saved table.
+ checkAnswer(
+ table(tableName).selectExpr("c", "b", "d", "a"),
+ df.selectExpr("c", "b", "d", "a").collect())
+
+ sql(s"drop table $tableName")
+ }
test("insert into a table") {
def createDF(from: Int, to: Int): DataFrame =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1da990bc95..223ba65f47 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -435,9 +435,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
}
test("Caching converted data source Parquet Relations") {
- def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
+ def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
- catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
+ catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case other =>
@@ -463,30 +463,30 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")
+ var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet")
// First, make sure the converted test_parquet is not cached.
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
- checkCached(tableIdentifer)
+ checkCached(tableIdentifier)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
invalidateTable("test_insert_parquet")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
|select a, b from jt
""".stripMargin)
- checkCached(tableIdentifer)
+ checkCached(tableIdentifier)
// Make sure we can read the data.
checkAnswer(
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
invalidateTable("test_insert_parquet")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
// Create a partitioned table.
sql(
@@ -503,8 +503,8 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -513,18 +513,18 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (date='2015-04-02')
|select a, b from jt
""".stripMargin)
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
- checkCached(tableIdentifer)
+ checkCached(tableIdentifier)
// Make sure we can read the data.
checkAnswer(
sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
@@ -536,7 +536,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
""".stripMargin).collect())
invalidateTable("test_parquet_partitioned_cache_test")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql("DROP TABLE test_insert_parquet")
sql("DROP TABLE test_parquet_partitioned_cache_test")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 09eed6646c..2d69b89fd9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -99,7 +99,7 @@ class SimpleTextRelation(
}
override def hashCode(): Int =
- Objects.hashCode(paths, maybeDataSchema, dataSchema)
+ Objects.hashCode(paths, maybeDataSchema, dataSchema, partitionColumns)
override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = {
val fields = dataSchema.map(_.dataType)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index ad4a4826c6..c7c8bcd27f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.parquet.ParquetTest
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
@@ -237,10 +236,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}
- def withTable(tableName: String)(f: => Unit): Unit = {
- try f finally sql(s"DROP TABLE $tableName")
- }
-
test("saveAsTable()/load() - non-partitioned table - Overwrite") {
testDF.write.format(dataSourceName).mode(SaveMode.Overwrite)
.option("dataSchema", dataSchema.json)
@@ -444,6 +439,23 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
checkAnswer(df, partitionedTestDF.collect())
}
}
+
+ test("Partition column type casting") {
+ withTempPath { file =>
+ val input = partitionedTestDF.select('a, 'b, 'p1.cast(StringType).as('ps), 'p2)
+
+ input
+ .write
+ .format(dataSourceName)
+ .mode(SaveMode.Overwrite)
+ .partitionBy("ps", "p2")
+ .saveAsTable("t")
+
+ withTempTable("t") {
+ checkAnswer(table("t"), input.collect())
+ }
+ }
+ }
}
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -504,4 +516,18 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
.load(file.getCanonicalPath))
}
}
+
+ test("SPARK-7616: adjust column name order accordingly when saving partitioned table") {
+ val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
+
+ df.write
+ .format("parquet")
+ .mode(SaveMode.Overwrite)
+ .partitionBy("c", "a")
+ .saveAsTable("t")
+
+ withTable("t") {
+ checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
+ }
+ }
}