aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorZongheng Yang <zongheng.y@gmail.com>2014-07-29 15:32:50 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-29 15:32:50 -0700
commitc7db274be79f448fda566208946cb50958ea9b1a (patch)
treed45aa61c5db53de2b6ef50eafae40e6cab73c80f /sql/core
parentdc9653641f8806960d79652afa043c3fb84f25d2 (diff)
downloadspark-c7db274be79f448fda566208946cb50958ea9b1a.tar.gz
spark-c7db274be79f448fda566208946cb50958ea9b1a.tar.bz2
spark-c7db274be79f448fda566208946cb50958ea9b1a.zip
[SPARK-2393][SQL] Cost estimation optimization framework for Catalyst logical plans & sample usage.
The idea is that every Catalyst logical plan gets hold of a Statistics class, the usage of which provides useful estimations on various statistics. See the implementations of `MetastoreRelation`. This patch also includes several usages of the estimation interface in the planner. For instance, we now use physical table sizes from the estimate interface to convert an equi-join to a broadcast join (when doing so is beneficial, as determined by a size threshold). Finally, there are a couple minor accompanying changes including: - Remove the not-in-use `BaseRelation`. - Make SparkLogicalPlan take a `SQLContext` in the second param list. Author: Zongheng Yang <zongheng.y@gmail.com> Closes #1238 from concretevitamin/estimates and squashes the following commits: 329071d [Zongheng Yang] Address review comments; turn config name from string to field in SQLConf. 8663e84 [Zongheng Yang] Use BigInt for stat; for logical leaves, by default throw an exception. 2f2fb89 [Zongheng Yang] Fix statistics for SparkLogicalPlan. 9951305 [Zongheng Yang] Remove childrenStats. 16fc60a [Zongheng Yang] Avoid calling statistics on plans if auto join conversion is disabled. 8bd2816 [Zongheng Yang] Add a note on performance of statistics. 6e594b8 [Zongheng Yang] Get size info from metastore for MetastoreRelation. 01b7a3e [Zongheng Yang] Update scaladoc for a field and move it to @param section. 549061c [Zongheng Yang] Remove numTuples in Statistics for now. 729a8e2 [Zongheng Yang] Update docs to be more explicit. 573e644 [Zongheng Yang] Remove singleton SQLConf and move back `settings` to the trait. 2d99eb5 [Zongheng Yang] {Cleanup, use synchronized in, enrich} StatisticsSuite. ca5b825 [Zongheng Yang] Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it. 43d38a6 [Zongheng Yang] Revert optimization for BroadcastNestedLoopJoin (this fixes tests). 0ef9e5b [Zongheng Yang] Use multiplication instead of sum for default estimates. 4ef0d26 [Zongheng Yang] Make Statistics a case class. 3ba8f3e [Zongheng Yang] Add comment. e5bcf5b [Zongheng Yang] Fix optimization conditions & update scala docs to explain. 7d9216a [Zongheng Yang] Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin. 73cde01 [Zongheng Yang] Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan. 73412be [Zongheng Yang] Move SQLConf to Catalyst & add default val for sizeInBytes. 7a60ab7 [Zongheng Yang] s/Estimates/Statistics, s/cardinality/numTuples. de3ae13 [Zongheng Yang] Add parquetAfter() properly in test. dcff9bd [Zongheng Yang] Cleanups. 84301a4 [Zongheng Yang] Refactors. 5bf5586 [Zongheng Yang] Typo. 56a8e6e [Zongheng Yang] Prototype impl of estimations for Catalyst logical plans.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala57
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala2
10 files changed, 101 insertions, 81 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 41920c00b5..be8d4e15ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -21,17 +21,31 @@ import java.util.Properties
import scala.collection.JavaConverters._
+object SQLConf {
+ val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
+ val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
+ val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
+
+ object Deprecated {
+ val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+ }
+}
+
/**
- * SQLConf holds mutable config parameters and hints. These can be set and
- * queried either by passing SET commands into Spark SQL's DSL
- * functions (sql(), hql(), etc.), or by programmatically using setters and
- * getters of this class.
+ * A trait that enables the setting and getting of mutable config parameters/hints.
+ *
+ * In the presence of a SQLContext, these can be set and queried by passing SET commands
+ * into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
+ * modify the hints by programmatically calling the setters and getters of this trait.
*
- * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
+ * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
trait SQLConf {
import SQLConf._
+ @transient protected[spark] val settings = java.util.Collections.synchronizedMap(
+ new java.util.HashMap[String, String]())
+
/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
@@ -40,28 +54,33 @@ trait SQLConf {
/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
- * a broadcast value during the physical executions of join operations. Setting this to 0
+ * a broadcast value during the physical executions of join operations. Setting this to -1
* effectively disables auto conversion.
- * Hive setting: hive.auto.convert.join.noconditionaltask.size.
+ *
+ * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
*/
- private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
+ private[spark] def autoBroadcastJoinThreshold: Int =
+ get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
- /** A comma-separated list of table names marked to be broadcasted during joins. */
- private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
+ /**
+ * The default size in bytes to assign to a logical operator's estimation statistics. By default,
+ * it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
+ * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
+ */
+ private[spark] def defaultSizeInBytes: Long =
+ getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)
/** ********************** SQLConf functionality methods ************ */
- @transient
- private val settings = java.util.Collections.synchronizedMap(
- new java.util.HashMap[String, String]())
-
def set(props: Properties): Unit = {
- props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
+ settings.synchronized {
+ props.asScala.foreach { case (k, v) => settings.put(k, v) }
+ }
}
def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
- require(value != null, s"value cannot be null for $key")
+ require(value != null, s"value cannot be null for key: $key")
settings.put(key, value)
}
@@ -90,13 +109,3 @@ trait SQLConf {
}
}
-
-object SQLConf {
- val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
- val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
- val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
-
- object Deprecated {
- val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index c178dad662..a136c7b3ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
-import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
@@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
- new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
+ new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
/**
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
@@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
- new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
+ new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))
/**
* :: Experimental ::
@@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
- val name = tableName
- val newPlan = rdd.logicalPlan transform {
- case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
- }
- catalog.registerTable(None, tableName, newPlan)
+ catalog.registerTable(None, tableName, rdd.logicalPlan)
}
/**
@@ -212,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
- catalog.registerTable(None, tableName, SparkLogicalPlan(e))
+ catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
case inMem: InMemoryRelation =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
@@ -405,7 +401,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
}
}
- new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
+ new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 019ff9d300..172b6e0e7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -418,7 +418,8 @@ class SchemaRDD(
* @group schema
*/
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
- new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd)))
+ new SchemaRDD(sqlContext,
+ SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext))
}
// =======================================================================
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index fe81721943..fd751031b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike {
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
- SparkLogicalPlan(queryExecution.executedPlan)
+ SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
case _ =>
baseLogicalPlan
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 790d9ef22c..806097c917 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
}
}
- new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
+ new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
}
/**
@@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* @group userf
*/
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
- new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
+ new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))
/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 27dc091b85..77c874d031 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Logging, Row}
+import org.apache.spark.sql.{Logging, Row, SQLContext}
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
/**
@@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
* linking.
*/
@DeveloperApi
-case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan")
- extends BaseRelation with MultiInstanceRelation {
+case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
+ extends LogicalPlan with MultiInstanceRelation {
def output = alreadyPlanned.output
override def references = Set.empty
@@ -78,9 +78,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case _ => sys.error("Multiple instance of the same relation detected.")
- }, tableName)
- .asInstanceOf[this.type]
+ })(sqlContext).asInstanceOf[this.type]
}
+
+ @transient override lazy val statistics = Statistics(
+ // TODO: Instead of returning a default value here, find a way to return a meaningful size
+ // estimate for RDDs. See PR 1238 for more discussions.
+ sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+ )
+
}
private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c078e71fe0..404d48ae05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.execution
+import scala.util.Try
+
import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.parquet._
@@ -47,9 +49,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
/**
* Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
* evaluated by matching hash keys.
+ *
+ * This strategy applies a simple optimization based on the estimates of the physical sizes of
+ * the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an
+ * estimated physical size smaller than the user-settable threshold
+ * [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the
+ * ''build'' relation and mark the other relation as the ''stream'' side. The build table will be
+ * ''broadcasted'' to all of the executors involved in the join, as a
+ * [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they
+ * will instead be used to decide the build side in a [[execution.ShuffledHashJoin]].
*/
object HashJoin extends Strategy with PredicateHelper {
- private[this] def broadcastHashJoin(
+ private[this] def makeBroadcastHashJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
left: LogicalPlan,
@@ -61,33 +72,27 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
}
- def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer
-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case ExtractEquiJoinKeys(
- Inner,
- leftKeys,
- rightKeys,
- condition,
- left,
- right @ PhysicalOperation(_, _, b: BaseRelation))
- if broadcastTables.contains(b.tableName) =>
- broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
+ case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
+ if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
+ right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
+ makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
- case ExtractEquiJoinKeys(
- Inner,
- leftKeys,
- rightKeys,
- condition,
- left @ PhysicalOperation(_, _, b: BaseRelation),
- right)
- if broadcastTables.contains(b.tableName) =>
- broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
+ case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
+ if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
+ left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
+ makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
+ val buildSide =
+ if (Try(right.statistics.sizeInBytes <= left.statistics.sizeInBytes).getOrElse(false)) {
+ BuildRight
+ } else {
+ BuildLeft
+ }
val hashJoin =
execution.ShuffledHashJoin(
- leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
+ leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case _ => Nil
@@ -273,8 +278,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Limit(limit, planLater(child))(sqlContext) :: Nil
case Unions(unionChildren) =>
execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
- case logical.Except(left,right) =>
- execution.Except(planLater(left),planLater(right)) :: Nil
+ case logical.Except(left,right) =>
+ execution.Except(planLater(left),planLater(right)) :: Nil
case logical.Intersect(left, right) =>
execution.Intersect(planLater(left), planLater(right)) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
@@ -283,7 +288,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ExistingRdd(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
- case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil
+ case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index b48c70ee73..6c2b553bb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -28,11 +28,12 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
-import org.apache.spark.sql.Logging
+import org.apache.spark.sql.{SQLContext, Logging}
private[sql] object JsonRDD extends Logging {
private[sql] def inferSchema(
+ sqlContext: SQLContext,
json: RDD[String],
samplingRatio: Double = 1.0): LogicalPlan = {
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
@@ -40,15 +41,17 @@ private[sql] object JsonRDD extends Logging {
val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
val baseSchema = createSchema(allKeys)
- createLogicalPlan(json, baseSchema)
+ createLogicalPlan(json, baseSchema, sqlContext)
}
private def createLogicalPlan(
json: RDD[String],
- baseSchema: StructType): LogicalPlan = {
+ baseSchema: StructType,
+ sqlContext: SQLContext): LogicalPlan = {
val schema = nullTypeToStringType(baseSchema)
- SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))
+ SparkLogicalPlan(
+ ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))(sqlContext)
}
private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 9c4771d1a9..8c7dbd5eb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -27,6 +27,7 @@ import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.metadata.CompressionCodecName
import parquet.schema.MessageType
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
@@ -45,7 +46,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
*/
private[sql] case class ParquetRelation(
path: String,
- @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {
+ @transient conf: Option[Configuration] = None)
+ extends LeafNode with MultiInstanceRelation {
self: Product =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index e17ecc87fd..025c396ef0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner}
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
class JoinSuite extends QueryTest {