aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-06-23 11:48:48 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-23 11:48:48 -0700
commit10396d9505c752cc18b6424f415d4ff0f460ad65 (patch)
treeb2f45ba2c96e182d2fad139d022651dbfe88494a /sql/core
parent60398dabc50d402bbab4190fbe94ebed6d3a48dc (diff)
downloadspark-10396d9505c752cc18b6424f415d4ff0f460ad65.tar.gz
spark-10396d9505c752cc18b6424f415d4ff0f460ad65.tar.bz2
spark-10396d9505c752cc18b6424f415d4ff0f460ad65.zip
[SPARK-16163] [SQL] Cache the statistics for logical plans
## What changes were proposed in this pull request? This calculation of statistics is not trivial anymore, it could be very slow on large query (for example, TPC-DS Q64 took several minutes to plan). During the planning of a query, the statistics of any logical plan should not change (even InMemoryRelation), so we should use `lazy val` to cache the statistics. For InMemoryRelation, the statistics could be updated after materialization, it's only useful when used in another query (before planning), because once we finished the planning, the statistics will not be used anymore. ## How was this patch tested? Testsed with TPC-DS Q64, it could be planned in a second after the patch. Author: Davies Liu <davies@databricks.com> Closes #13871 from davies/fix_statistics.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala65
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala3
2 files changed, 20 insertions, 48 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index c546d4bc97..02866c76cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -63,58 +63,32 @@ private[sql] case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
- @transient private[sql] var _statistics: Statistics = null,
- private[sql] var _batchStats: CollectionAccumulator[InternalRow] = null)
+ private[sql] val batchStats: CollectionAccumulator[InternalRow] =
+ child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
extends logical.LeafNode with MultiInstanceRelation {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override def producedAttributes: AttributeSet = outputSet
- private[sql] val batchStats: CollectionAccumulator[InternalRow] =
- if (_batchStats == null) {
- child.sqlContext.sparkContext.collectionAccumulator[InternalRow]
- } else {
- _batchStats
- }
-
@transient val partitionStatistics = new PartitionStatistics(output)
- private def computeSizeInBytes = {
- val sizeOfRow: Expression =
- BindReferences.bindReference(
- output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
- partitionStatistics.schema)
-
- batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
- }
-
- // Statistics propagation contracts:
- // 1. Non-null `_statistics` must reflect the actual statistics of the underlying data
- // 2. Only propagate statistics when `_statistics` is non-null
- private def statisticsToBePropagated = if (_statistics == null) {
- val updatedStats = statistics
- if (_statistics == null) null else updatedStats
- } else {
- _statistics
- }
-
- override def statistics: Statistics = {
- if (_statistics == null) {
- if (batchStats.value.isEmpty) {
- // Underlying columnar RDD hasn't been materialized, no useful statistics information
- // available, return the default statistics.
- Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
- } else {
- // Underlying columnar RDD has been materialized, required information has also been
- // collected via the `batchStats` accumulator, compute the final statistics,
- // and update `_statistics`.
- _statistics = Statistics(sizeInBytes = computeSizeInBytes)
- _statistics
- }
+ override lazy val statistics: Statistics = {
+ if (batchStats.value.isEmpty) {
+ // Underlying columnar RDD hasn't been materialized, no useful statistics information
+ // available, return the default statistics.
+ Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
- // Pre-computed statistics
- _statistics
+ // Underlying columnar RDD has been materialized, required information has also been
+ // collected via the `batchStats` accumulator.
+ val sizeOfRow: Expression =
+ BindReferences.bindReference(
+ output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
+ partitionStatistics.schema)
+
+ val sizeInBytes =
+ batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
+ Statistics(sizeInBytes = sizeInBytes)
}
}
@@ -187,7 +161,7 @@ private[sql] case class InMemoryRelation(
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
InMemoryRelation(
newOutput, useCompression, batchSize, storageLevel, child, tableName)(
- _cachedColumnBuffers, statisticsToBePropagated, batchStats)
+ _cachedColumnBuffers, batchStats)
}
override def newInstance(): this.type = {
@@ -199,12 +173,11 @@ private[sql] case class InMemoryRelation(
child,
tableName)(
_cachedColumnBuffers,
- statisticsToBePropagated,
batchStats).asInstanceOf[this.type]
}
def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers
override protected def otherCopyArgs: Seq[AnyRef] =
- Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats)
+ Seq(_cachedColumnBuffers, batchStats)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 742f036e55..b15f38c2a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -323,8 +323,7 @@ abstract class QueryTest extends PlanTest {
origin.child,
l.tableName)(
origin.cachedColumnBuffers,
- l._statistics,
- origin._batchStats)
+ origin.batchStats)
case p =>
p.transformExpressions {
case s: SubqueryExpression =>