aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-27 15:14:08 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-27 15:14:08 -0700
commit7d2a7a91f263bb9fbf24dc4dbffde8fe5e2c7442 (patch)
treed42df0d5dfe23d3e4cea4f833a863afa1a0e6d13 /sql
parent65253502b913f390b26b9b631380b2c6cf1ccdf7 (diff)
downloadspark-7d2a7a91f263bb9fbf24dc4dbffde8fe5e2c7442.tar.gz
spark-7d2a7a91f263bb9fbf24dc4dbffde8fe5e2c7442.tar.bz2
spark-7d2a7a91f263bb9fbf24dc4dbffde8fe5e2c7442.zip
[SPARK-3235][SQL] Ensure in-memory tables don't always broadcast.
Author: Michael Armbrust <michael@databricks.com> Closes #2147 from marmbrus/inMemDefaultSize and squashes the following commits: 5390360 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into inMemDefaultSize 14204d3 [Michael Armbrust] Set the context before creating SparkLogicalPlans. 8da4414 [Michael Armbrust] Make sure we throw errors when leaf nodes fail to provide statistcs 18ce029 [Michael Armbrust] Ensure in-memory tables don't always broadcast.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala8
5 files changed, 23 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 8616ac45b0..f81d911194 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -41,9 +41,14 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
case class Statistics(
sizeInBytes: BigInt
)
- lazy val statistics: Statistics = Statistics(
- sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
- )
+ lazy val statistics: Statistics = {
+ if (children.size == 0) {
+ throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
+ }
+
+ Statistics(
+ sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
+ }
/**
* Returns the set of attributes that this node takes as
@@ -117,9 +122,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
*/
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
self: Product =>
-
- override lazy val statistics: Statistics =
- throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6f0eed3f63..a75af94d29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -89,8 +89,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
- implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
+ implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = {
+ SparkPlan.currentContext.set(self)
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
+ }
/**
* :: DeveloperApi ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 24e88eea31..bc36bacd00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -39,6 +39,9 @@ private[sql] case class InMemoryRelation(
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation {
+ override lazy val statistics =
+ Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes)
+
// If the cached column buffers were not passed in, we calculate them in the constructor.
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 7d33ea5b02..2b8913985b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* populated by the query planning infrastructure.
*/
@transient
- protected val sqlContext = SparkPlan.currentContext.get()
+ protected[spark] val sqlContext = SparkPlan.currentContext.get()
protected def sparkContext = sqlContext.sparkContext
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 736c0f8571..fdd2799a53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -33,6 +33,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
checkAnswer(scan, testData.collect().toSeq)
}
+ test("default size avoids broadcast") {
+ // TODO: Improve this test when we have better statistics
+ sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).registerTempTable("sizeTst")
+ cacheTable("sizeTst")
+ assert(
+ table("sizeTst").queryExecution.logical.statistics.sizeInBytes > autoBroadcastJoinThreshold)
+ }
+
test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, plan)