diff options
author | Zongheng Yang <zongheng.y@gmail.com> | 2014-07-29 15:32:50 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-29 15:32:50 -0700 |
commit | c7db274be79f448fda566208946cb50958ea9b1a (patch) | |
tree | d45aa61c5db53de2b6ef50eafae40e6cab73c80f /sql/core | |
parent | dc9653641f8806960d79652afa043c3fb84f25d2 (diff) | |
download | spark-c7db274be79f448fda566208946cb50958ea9b1a.tar.gz spark-c7db274be79f448fda566208946cb50958ea9b1a.tar.bz2 spark-c7db274be79f448fda566208946cb50958ea9b1a.zip |
[SPARK-2393][SQL] Cost estimation optimization framework for Catalyst logical plans & sample usage.
The idea is that every Catalyst logical plan gets hold of a Statistics class, the usage of which provides useful estimations on various statistics. See the implementations of `MetastoreRelation`.
This patch also includes several usages of the estimation interface in the planner. For instance, we now use physical table sizes from the estimate interface to convert an equi-join to a broadcast join (when doing so is beneficial, as determined by a size threshold).
Finally, there are a couple minor accompanying changes including:
- Remove the not-in-use `BaseRelation`.
- Make SparkLogicalPlan take a `SQLContext` in the second param list.
Author: Zongheng Yang <zongheng.y@gmail.com>
Closes #1238 from concretevitamin/estimates and squashes the following commits:
329071d [Zongheng Yang] Address review comments; turn config name from string to field in SQLConf.
8663e84 [Zongheng Yang] Use BigInt for stat; for logical leaves, by default throw an exception.
2f2fb89 [Zongheng Yang] Fix statistics for SparkLogicalPlan.
9951305 [Zongheng Yang] Remove childrenStats.
16fc60a [Zongheng Yang] Avoid calling statistics on plans if auto join conversion is disabled.
8bd2816 [Zongheng Yang] Add a note on performance of statistics.
6e594b8 [Zongheng Yang] Get size info from metastore for MetastoreRelation.
01b7a3e [Zongheng Yang] Update scaladoc for a field and move it to @param section.
549061c [Zongheng Yang] Remove numTuples in Statistics for now.
729a8e2 [Zongheng Yang] Update docs to be more explicit.
573e644 [Zongheng Yang] Remove singleton SQLConf and move back `settings` to the trait.
2d99eb5 [Zongheng Yang] {Cleanup, use synchronized in, enrich} StatisticsSuite.
ca5b825 [Zongheng Yang] Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it.
43d38a6 [Zongheng Yang] Revert optimization for BroadcastNestedLoopJoin (this fixes tests).
0ef9e5b [Zongheng Yang] Use multiplication instead of sum for default estimates.
4ef0d26 [Zongheng Yang] Make Statistics a case class.
3ba8f3e [Zongheng Yang] Add comment.
e5bcf5b [Zongheng Yang] Fix optimization conditions & update scala docs to explain.
7d9216a [Zongheng Yang] Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin.
73cde01 [Zongheng Yang] Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan.
73412be [Zongheng Yang] Move SQLConf to Catalyst & add default val for sizeInBytes.
7a60ab7 [Zongheng Yang] s/Estimates/Statistics, s/cardinality/numTuples.
de3ae13 [Zongheng Yang] Add parquetAfter() properly in test.
dcff9bd [Zongheng Yang] Cleanups.
84301a4 [Zongheng Yang] Refactors.
5bf5586 [Zongheng Yang] Typo.
56a8e6e [Zongheng Yang] Prototype impl of estimations for Catalyst logical plans.
Diffstat (limited to 'sql/core')
10 files changed, 101 insertions, 81 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 41920c00b5..be8d4e15ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -21,17 +21,31 @@ import java.util.Properties import scala.collection.JavaConverters._ +object SQLConf { + val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold" + val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" + val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes" + + object Deprecated { + val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" + } +} + /** - * SQLConf holds mutable config parameters and hints. These can be set and - * queried either by passing SET commands into Spark SQL's DSL - * functions (sql(), hql(), etc.), or by programmatically using setters and - * getters of this class. + * A trait that enables the setting and getting of mutable config parameters/hints. + * + * In the presence of a SQLContext, these can be set and queried by passing SET commands + * into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can + * modify the hints by programmatically calling the setters and getters of this trait. * - * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads). + * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ trait SQLConf { import SQLConf._ + @transient protected[spark] val settings = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, String]()) + /** ************************ Spark SQL Params/Hints ******************* */ // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? @@ -40,28 +54,33 @@ trait SQLConf { /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to - * a broadcast value during the physical executions of join operations. Setting this to 0 + * a broadcast value during the physical executions of join operations. Setting this to -1 * effectively disables auto conversion. - * Hive setting: hive.auto.convert.join.noconditionaltask.size. + * + * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000. */ - private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt + private[spark] def autoBroadcastJoinThreshold: Int = + get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt - /** A comma-separated list of table names marked to be broadcasted during joins. */ - private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "") + /** + * The default size in bytes to assign to a logical operator's estimation statistics. By default, + * it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a + * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins. + */ + private[spark] def defaultSizeInBytes: Long = + getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1) /** ********************** SQLConf functionality methods ************ */ - @transient - private val settings = java.util.Collections.synchronizedMap( - new java.util.HashMap[String, String]()) - def set(props: Properties): Unit = { - props.asScala.foreach { case (k, v) => this.settings.put(k, v) } + settings.synchronized { + props.asScala.foreach { case (k, v) => settings.put(k, v) } + } } def set(key: String, value: String): Unit = { require(key != null, "key cannot be null") - require(value != null, s"value cannot be null for $key") + require(value != null, s"value cannot be null for key: $key") settings.put(key, value) } @@ -90,13 +109,3 @@ trait SQLConf { } } - -object SQLConf { - val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size" - val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables" - - object Deprecated { - val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index c178dad662..a136c7b3ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.ExpressionConversions -import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.SparkStrategies @@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = - new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))) + new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self)) /** * Loads a Parquet file, returning the result as a [[SchemaRDD]]. @@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = - new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio)) + new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio)) /** * :: Experimental :: @@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { - val name = tableName - val newPlan = rdd.logicalPlan transform { - case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name) - } - catalog.registerTable(None, tableName, newPlan) + catalog.registerTable(None, tableName, rdd.logicalPlan) } /** @@ -212,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext) case inMem @ InMemoryRelation(_, _, e: ExistingRdd) => inMem.cachedColumnBuffers.unpersist() catalog.unregisterTable(None, tableName) - catalog.registerTable(None, tableName, SparkLogicalPlan(e)) + catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self)) case inMem: InMemoryRelation => inMem.cachedColumnBuffers.unpersist() catalog.unregisterTable(None, tableName) @@ -405,7 +401,7 @@ class SQLContext(@transient val sparkContext: SparkContext) new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row } } - new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 019ff9d300..172b6e0e7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -418,7 +418,8 @@ class SchemaRDD( * @group schema */ private def applySchema(rdd: RDD[Row]): SchemaRDD = { - new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))) + new SchemaRDD(sqlContext, + SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext)) } // ======================================================================= diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index fe81721943..fd751031b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike { // happen right away to let these side effects take place eagerly. case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => queryExecution.toRdd - SparkLogicalPlan(queryExecution.executedPlan) + SparkLogicalPlan(queryExecution.executedPlan)(sqlContext) case _ => baseLogicalPlan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 790d9ef22c..806097c917 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) { new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow } } - new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext)) } /** @@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) { * @group userf */ def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = - new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0)) + new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0)) /** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 27dc091b85..77c874d031 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Logging, Row} +import org.apache.spark.sql.{Logging, Row, SQLContext} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ /** @@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { * linking. */ @DeveloperApi -case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan") - extends BaseRelation with MultiInstanceRelation { +case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext) + extends LogicalPlan with MultiInstanceRelation { def output = alreadyPlanned.output override def references = Set.empty @@ -78,9 +78,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) case _ => sys.error("Multiple instance of the same relation detected.") - }, tableName) - .asInstanceOf[this.type] + })(sqlContext).asInstanceOf[this.type] } + + @transient override lazy val statistics = Statistics( + // TODO: Instead of returning a default value here, find a way to return a meaningful size + // estimate for RDDs. See PR 1238 for more discussions. + sizeInBytes = BigInt(sqlContext.defaultSizeInBytes) + ) + } private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c078e71fe0..404d48ae05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution +import scala.util.Try + import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} import org.apache.spark.sql.parquet._ @@ -47,9 +49,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { /** * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be * evaluated by matching hash keys. + * + * This strategy applies a simple optimization based on the estimates of the physical sizes of + * the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an + * estimated physical size smaller than the user-settable threshold + * [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the + * ''build'' relation and mark the other relation as the ''stream'' side. The build table will be + * ''broadcasted'' to all of the executors involved in the join, as a + * [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they + * will instead be used to decide the build side in a [[execution.ShuffledHashJoin]]. */ object HashJoin extends Strategy with PredicateHelper { - private[this] def broadcastHashJoin( + private[this] def makeBroadcastHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: LogicalPlan, @@ -61,33 +72,27 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil } - def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ExtractEquiJoinKeys( - Inner, - leftKeys, - rightKeys, - condition, - left, - right @ PhysicalOperation(_, _, b: BaseRelation)) - if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) + if Try(sqlContext.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) => + makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - case ExtractEquiJoinKeys( - Inner, - leftKeys, - rightKeys, - condition, - left @ PhysicalOperation(_, _, b: BaseRelation), - right) - if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) + if Try(sqlContext.autoBroadcastJoinThreshold > 0 && + left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) => + makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => + val buildSide = + if (Try(right.statistics.sizeInBytes <= left.statistics.sizeInBytes).getOrElse(false)) { + BuildRight + } else { + BuildLeft + } val hashJoin = execution.ShuffledHashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + leftKeys, rightKeys, buildSide, planLater(left), planLater(right)) condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case _ => Nil @@ -273,8 +278,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil - case logical.Except(left,right) => - execution.Except(planLater(left),planLater(right)) :: Nil + case logical.Except(left,right) => + execution.Except(planLater(left),planLater(right)) :: Nil case logical.Intersect(left, right) => execution.Intersect(planLater(left), planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => @@ -283,7 +288,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.ExistingRdd(Nil, singleRowRdd) :: Nil case logical.Repartition(expressions, child) => execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil - case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil + case SparkLogicalPlan(existingPlan) => existingPlan :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index b48c70ee73..6c2b553bb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -28,11 +28,12 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} -import org.apache.spark.sql.Logging +import org.apache.spark.sql.{SQLContext, Logging} private[sql] object JsonRDD extends Logging { private[sql] def inferSchema( + sqlContext: SQLContext, json: RDD[String], samplingRatio: Double = 1.0): LogicalPlan = { require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0") @@ -40,15 +41,17 @@ private[sql] object JsonRDD extends Logging { val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _) val baseSchema = createSchema(allKeys) - createLogicalPlan(json, baseSchema) + createLogicalPlan(json, baseSchema, sqlContext) } private def createLogicalPlan( json: RDD[String], - baseSchema: StructType): LogicalPlan = { + baseSchema: StructType, + sqlContext: SQLContext): LogicalPlan = { val schema = nullTypeToStringType(baseSchema) - SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema)))) + SparkLogicalPlan( + ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))(sqlContext) } private def createSchema(allKeys: Set[(String, DataType)]): StructType = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 9c4771d1a9..8c7dbd5eb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -27,6 +27,7 @@ import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.metadata.CompressionCodecName import parquet.schema.MessageType +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} @@ -45,7 +46,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} */ private[sql] case class ParquetRelation( path: String, - @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation { + @transient conf: Option[Configuration] = None) + extends LeafNode with MultiInstanceRelation { self: Product => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index e17ecc87fd..025c396ef0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ class JoinSuite extends QueryTest { |