aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-15 16:20:49 +0800
committerCheng Lian <lian@databricks.com>2015-05-15 16:20:49 +0800
commitfdf5bba35d201fe0de3901b4d47262c485c76569 (patch)
tree71bbf6e41c872d3ac16def2faf88383b57c6a7c1 /sql/hive
parent94761485b207fa1f12a8410a68920300d851bf61 (diff)
downloadspark-fdf5bba35d201fe0de3901b4d47262c485c76569.tar.gz
spark-fdf5bba35d201fe0de3901b4d47262c485c76569.tar.bz2
spark-fdf5bba35d201fe0de3901b4d47262c485c76569.zip
[SPARK-7591] [SQL] Partitioning support API tweaks
Please see [SPARK-7591] [1] for the details. /cc rxin marmbrus yhuai [1]: https://issues.apache.org/jira/browse/SPARK-7591 Author: Cheng Lian <lian@databricks.com> Closes #6150 from liancheng/spark-7591 and squashes the following commits: af422e7 [Cheng Lian] Addresses @rxin's comments 37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization 2fc680a [Cheng Lian] Fixes Scala style issue 189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments 522c24e [Cheng Lian] Adds OutputWriterFactory 047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala47
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala)8
7 files changed, 50 insertions, 53 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b0e82c8d03..2aa80b47a9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
- FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
- FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
+ ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
+ case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
@@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new FSBasedParquetRelation(
+ new ParquetRelation2(
paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
+ new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 8e405e0804..6609763343 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
- case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
+ case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index da5d203d9d..1bf1c1be3e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: FSBasedParquetRelation) => // OK
+ case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+ s"${classOf[ParquetRelation2].getCanonicalName}")
}
// Clenup and reset confs.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 5c7152e214..dfe73c62c4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
@@ -175,17 +175,17 @@ class SQLQuerySuite extends QueryTest {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
- case LogicalRelation(r: FSBasedParquetRelation) =>
+ case LogicalRelation(r: ParquetRelation2) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
- s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
+ s"${ParquetRelation2.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
- s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " +
+ s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 41bcbe84b0..b6be09e2f8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
-import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation}
+import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
+import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils
@@ -291,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(_: FSBasedParquetRelation) => // OK
+ case LogicalRelation(_: ParquetRelation2) => // OK
case _ => fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+ s"${classOf[ParquetRelation2].getCanonicalName}")
}
sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -315,9 +315,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+ s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
s"However, found a ${o.toString} ")
}
@@ -345,9 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+ s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}
@@ -378,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
assertResult(2) {
analyzed.collect {
- case r @ LogicalRelation(_: FSBasedParquetRelation) => r
+ case r @ LogicalRelation(_: ParquetRelation2) => r
}.size
}
@@ -390,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 8801aba2f6..29b21586f9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -24,7 +24,7 @@ import com.google.common.base.Objects
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
-import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -32,17 +32,16 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
/**
- * A simple example [[FSBasedRelationProvider]].
+ * A simple example [[HadoopFsRelationProvider]].
*/
-class SimpleTextSource extends FSBasedRelationProvider {
+class SimpleTextSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
- parameters: Map[String, String]): FSBasedRelation = {
- val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField]))
- new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext)
+ parameters: Map[String, String]): HadoopFsRelation = {
+ new SimpleTextRelation(paths, schema, partitionColumns, parameters)(sqlContext)
}
}
@@ -59,38 +58,30 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
}
}
-class SimpleTextOutputWriter extends OutputWriter {
- private var recordWriter: RecordWriter[NullWritable, Text] = _
- private var taskAttemptContext: TaskAttemptContext = _
-
- override def init(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): Unit = {
- recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
- taskAttemptContext = context
- }
+class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+ private val recordWriter: RecordWriter[NullWritable, Text] =
+ new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
override def write(row: Row): Unit = {
val serialized = row.toSeq.map(_.toString).mkString(",")
recordWriter.write(null, new Text(serialized))
}
- override def close(): Unit = recordWriter.close(taskAttemptContext)
+ override def close(): Unit = recordWriter.close(context)
}
/**
- * A simple example [[FSBasedRelation]], used for testing purposes. Data are stored as comma
+ * A simple example [[HadoopFsRelation]], used for testing purposes. Data are stored as comma
* separated string lines. When scanning data, schema must be explicitly provided via data source
* option `"dataSchema"`.
*/
class SimpleTextRelation(
- paths: Array[String],
+ override val paths: Array[String],
val maybeDataSchema: Option[StructType],
- partitionsSchema: StructType,
+ override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
- extends FSBasedRelation(paths, partitionsSchema) {
+ extends HadoopFsRelation {
import sqlContext.sparkContext
@@ -110,9 +101,6 @@ class SimpleTextRelation(
override def hashCode(): Int =
Objects.hashCode(paths, maybeDataSchema, dataSchema)
- override def outputWriterClass: Class[_ <: OutputWriter] =
- classOf[SimpleTextOutputWriter]
-
override def buildScan(inputPaths: Array[String]): RDD[Row] = {
val fields = dataSchema.map(_.dataType)
@@ -122,4 +110,13 @@ class SimpleTextRelation(
}: _*)
}
}
+
+ override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 394833f229..cf6afd25ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
// TODO Don't extend ParquetTest
// This test suite extends ParquetTest for some convenient utility methods. These methods should be
// moved to some more general places, maybe QueryTest.
-class FSBasedRelationTest extends QueryTest with ParquetTest {
+class HadoopFsRelationTest extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestHive
import sqlContext._
@@ -487,7 +487,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest {
}
val actualPaths = df.queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: FSBasedRelation) =>
+ case LogicalRelation(relation: HadoopFsRelation) =>
relation.paths.toSet
}.getOrElse {
fail("Expect an FSBasedRelation, but none could be found")
@@ -499,7 +499,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest {
}
}
-class SimpleTextRelationSuite extends FSBasedRelationTest {
+class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
import sqlContext._
@@ -530,7 +530,7 @@ class SimpleTextRelationSuite extends FSBasedRelationTest {
}
}
-class FSBasedParquetRelationSuite extends FSBasedRelationTest {
+class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
import sqlContext._