From 64e826f91eabb1a22d3d163d71fbb7b6d2185f25 Mon Sep 17 00:00:00 2001 From: Yadong Qi Date: Tue, 6 Sep 2016 10:57:21 +0800 Subject: [SPARK-17358][SQL] Cached table(parquet/orc) should be shard between beelines ## What changes were proposed in this pull request? Cached table(parquet/orc) couldn't be shard between beelines, because the `sameResult` method used by `CacheManager` always return false(`sparkSession` are different) when compare two `HadoopFsRelation` in different beelines. So we make `sparkSession` a curry parameter. ## How was this patch tested? Beeline1 ``` 1: jdbc:hive2://localhost:10000> CACHE TABLE src_pqt; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (5.143 seconds) 1: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt; +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == InMemoryTableScan [key#49, value#50] +- InMemoryRelation [key#49, value#50], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt` +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ ``` Beeline2 ``` 0: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt; +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == InMemoryTableScan [key#68, value#69] +- InMemoryRelation [key#68, value#69], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt` +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ ``` Author: Yadong Qi Closes #14913 from watermen/SPARK-17358. --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 6 ++---- .../spark/sql/execution/datasources/fileSourceInterfaces.scala | 4 ++-- .../spark/sql/execution/datasources/FileSourceStrategySuite.scala | 3 ++- 3 files changed, 6 insertions(+), 7 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5968db84cd..9c99a800cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -351,13 +351,12 @@ case class DataSource( } HadoopFsRelation( - sparkSession, fileCatalog, partitionSchema = fileCatalog.partitionSpec().partitionColumns, dataSchema = dataSchema, bucketSpec = None, format, - options) + options)(sparkSession) // This is a non-streaming file based datasource. case (format: FileFormat, _) => @@ -409,13 +408,12 @@ case class DataSource( } HadoopFsRelation( - sparkSession, fileCatalog, partitionSchema = fileCatalog.partitionSpec().partitionColumns, dataSchema = dataSchema.asNullable, bucketSpec = bucketSpec, format, - caseInsensitiveOptions) + caseInsensitiveOptions)(sparkSession) case _ => throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index e03a2323c7..7e40c35984 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -134,13 +134,13 @@ abstract class OutputWriter { * @param options Configuration used when reading / writing data. */ case class HadoopFsRelation( - sparkSession: SparkSession, location: FileCatalog, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], fileFormat: FileFormat, - options: Map[String, String]) extends BaseRelation with FileRelation { + options: Map[String, String])(val sparkSession: SparkSession) + extends BaseRelation with FileRelation { override def sqlContext: SQLContext = sparkSession.sqlContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 09fd750180..45411fa065 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -508,7 +508,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi val bucketed = df.queryExecution.analyzed transform { case l @ LogicalRelation(r: HadoopFsRelation, _, _) => l.copy(relation = - r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))) + r.copy(bucketSpec = + Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession)) } Dataset.ofRows(spark, bucketed) } else { -- cgit v1.2.3