aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-05-21 13:51:40 -0700
committerYin Huai <yhuai@databricks.com>2015-05-21 13:51:40 -0700
commit30f3f556f7161a49baf145c0cbba8c088b512a6a (patch)
tree974c7a1670b771f90863840a4cb9704ff8d9421c /sql/core
parent311fab6f1b00db1a581d77be5196dd045f93d83d (diff)
downloadspark-30f3f556f7161a49baf145c0cbba8c088b512a6a.tar.gz
spark-30f3f556f7161a49baf145c0cbba8c088b512a6a.tar.bz2
spark-30f3f556f7161a49baf145c0cbba8c088b512a6a.zip
[SPARK-7763] [SPARK-7616] [SQL] Persists partition columns into metastore
Author: Yin Huai <yhuai@databricks.com> Author: Cheng Lian <lian@databricks.com> Closes #6285 from liancheng/spark-7763 and squashes the following commits: bb2829d [Yin Huai] Fix hashCode. d677f7d [Cheng Lian] Fixes Scala style issue 44b283f [Cheng Lian] Adds test case for SPARK-7616 6733276 [Yin Huai] Fix a bug that potentially causes https://issues.apache.org/jira/browse/SPARK-7616. 6cabf3c [Yin Huai] Update unit test. 7e02910 [Yin Huai] Use metastore partition columns and do not hijack maybePartitionSpec. e9a03ec [Cheng Lian] Persists partition columns into metastore
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala7
5 files changed, 70 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c35b7eff82..32986aa3ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -49,8 +49,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
- val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
- new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
+ new ParquetRelation2(paths, schema, None, partitionColumns, parameters)(sqlContext)
}
}
@@ -118,12 +117,28 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
private[sql] class ParquetRelation2(
override val paths: Array[String],
private val maybeDataSchema: Option[StructType],
+ // This is for metastore conversion.
private val maybePartitionSpec: Option[PartitionSpec],
+ override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {
+ private[sql] def this(
+ paths: Array[String],
+ maybeDataSchema: Option[StructType],
+ maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])(
+ sqlContext: SQLContext) = {
+ this(
+ paths,
+ maybeDataSchema,
+ maybePartitionSpec,
+ maybePartitionSpec.map(_.partitionColumns),
+ parameters)(sqlContext)
+ }
+
// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
@@ -161,7 +176,7 @@ private[sql] class ParquetRelation2(
Boolean.box(shouldMergeSchemas),
paths.toSet,
maybeDataSchema,
- maybePartitionSpec)
+ partitionColumns)
} else {
Objects.hashCode(
Boolean.box(shouldMergeSchemas),
@@ -169,7 +184,7 @@ private[sql] class ParquetRelation2(
dataSchema,
schema,
maybeDataSchema,
- maybePartitionSpec)
+ partitionColumns)
}
}
@@ -185,9 +200,6 @@ private[sql] class ParquetRelation2(
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
- override def userDefinedPartitionColumns: Option[StructType] =
- maybePartitionSpec.map(_.partitionColumns)
-
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index d54dbb0831..498f7538d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -93,6 +93,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
job.setOutputValueClass(classOf[Row])
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
+ // We create a DataFrame by applying the schema of relation to the data to make sure.
+ // We are writing data based on the expected schema,
val df = sqlContext.createDataFrame(
DataFrame(sqlContext, query).queryExecution.toRdd,
relation.schema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index a13ab74852..5e723122ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.RunnableCommand
@@ -245,12 +245,13 @@ private[sql] object ResolvedDataSource {
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
}
- val dataSchema = StructType(schema.filterNot(f => partitionColumns.contains(f.name)))
+ val dataSchema =
+ StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
dataSource.createRelation(
sqlContext,
paths,
- Some(schema),
+ Some(dataSchema),
maybePartitionsSchema,
caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
@@ -320,10 +321,20 @@ private[sql] object ResolvedDataSource {
Some(dataSchema.asNullable),
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
+
+ // For partitioned relation r, r.schema's column ordering is different with the column
+ // ordering of data.logicalPlan. We need a Project to adjust the ordering.
+ // So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
+ // the data.
+ val project =
+ Project(
+ r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
+ data.logicalPlan)
+
sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
- data.logicalPlan,
+ project,
partitionColumns.toArray,
mode)).toRdd
r
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index fcbac0d457..61fc4e5c19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.SerializableWritable
-import org.apache.spark.sql._
+import org.apache.spark.sql.{Row, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.{StructField, StructType}
@@ -120,11 +120,13 @@ trait HadoopFsRelationProvider {
* Returns a new base relation with the given parameters, a user defined schema, and a list of
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
* is enforced by the Map that is passed to the function.
+ *
+ * @param dataSchema Schema of data columns (i.e., columns that are not partition columns).
*/
def createRelation(
sqlContext: SQLContext,
paths: Array[String],
- schema: Option[StructType],
+ dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation
}
@@ -416,8 +418,29 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
final private[sql] def partitionSpec: PartitionSpec = {
if (_partitionSpec == null) {
_partitionSpec = maybePartitionSpec
- .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
- .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
+ .flatMap {
+ case spec if spec.partitions.nonEmpty =>
+ Some(spec.copy(partitionColumns = spec.partitionColumns.asNullable))
+ case _ =>
+ None
+ }
+ .orElse {
+ // We only know the partition columns and their data types. We need to discover
+ // partition values.
+ userDefinedPartitionColumns.map { partitionSchema =>
+ val spec = discoverPartitions()
+ val castedPartitions = spec.partitions.map { case p @ Partition(values, path) =>
+ val literals = values.toSeq.zip(spec.partitionColumns.map(_.dataType)).map {
+ case (value, dataType) => Literal.create(value, dataType)
+ }
+ val castedValues = partitionSchema.zip(literals).map { case (field, literal) =>
+ Cast(literal, field.dataType).eval()
+ }
+ p.copy(values = Row.fromSeq(castedValues))
+ }
+ PartitionSpec(partitionSchema, castedPartitions)
+ }
+ }
.getOrElse {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
discoverPartitions()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 75d290625e..ca66cdc482 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -78,4 +78,11 @@ trait SQLTestUtils {
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
}
+
+ /**
+ * Drops table `tableName` after calling `f`.
+ */
+ protected def withTable(tableName: String)(f: => Unit): Unit = {
+ try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+ }
}