aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-06-24 07:19:20 +0800
committerCheng Lian <lian@databricks.com>2016-06-24 07:19:20 +0800
commit264bc63623b20529abcf84abcb333e7c16ad1ef9 (patch)
tree09000c0316ba2fafa5af0ea00bbf2ca673e21589
parent0e4bdebece892edb126fa443f67c846e44e7367e (diff)
downloadspark-264bc63623b20529abcf84abcb333e7c16ad1ef9.tar.gz
spark-264bc63623b20529abcf84abcb333e7c16ad1ef9.tar.bz2
spark-264bc63623b20529abcf84abcb333e7c16ad1ef9.zip
[SPARK-16165][SQL] Fix the update logic for InMemoryTableScanExec.readBatches
## What changes were proposed in this pull request? Currently, `readBatches` accumulator of `InMemoryTableScanExec` is updated only when `spark.sql.inMemoryColumnarStorage.partitionPruning` is true. Although this metric is used for only testing purpose, we had better have correct metric without considering SQL options. ## How was this patch tested? Pass the Jenkins tests (including a new testcase). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13870 from dongjoon-hyun/SPARK-16165.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala15
2 files changed, 18 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 2695f356cd..183e4947b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -147,9 +147,6 @@ private[sql] case class InMemoryTableScanExec(
logInfo(s"Skipping partition based on stats $statsString")
false
} else {
- if (enableAccumulators) {
- readBatches.add(1)
- }
true
}
}
@@ -159,6 +156,9 @@ private[sql] case class InMemoryTableScanExec(
// update SQL metrics
val withMetrics = cachedBatchesToScan.map { batch =>
+ if (enableAccumulators) {
+ readBatches.add(1)
+ }
numOutputRows += batch.numRows
batch
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index a118cec0bb..7ca8e047f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -119,6 +119,21 @@ class PartitionBatchPruningSuite
}
}
+ // With disable IN_MEMORY_PARTITION_PRUNING option
+ test("disable IN_MEMORY_PARTITION_PRUNING") {
+ spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)
+
+ val df = sql("SELECT key FROM pruningData WHERE key = 1")
+ val result = df.collect().map(_(0)).toArray
+ assert(result.length === 1)
+
+ val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
+ case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value)
+ }.head
+ assert(readPartitions === 5)
+ assert(readBatches === 10)
+ }
+
def checkBatchPruning(
query: String,
expectedReadPartitions: Int,