aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-05-22 00:03:37 -0700
committerReynold Xin <rxin@databricks.com>2016-05-22 00:03:37 -0700
commit845e447fa03bf0a53ed79fa7e240af94dc152d2c (patch)
tree59fe8ef77f72b4670dbf9d10539495cf875e135e /sql
parenta11175eecacd4a57325dab29fff9ebfad819f22f (diff)
downloadspark-845e447fa03bf0a53ed79fa7e240af94dc152d2c.tar.gz
spark-845e447fa03bf0a53ed79fa7e240af94dc152d2c.tar.bz2
spark-845e447fa03bf0a53ed79fa7e240af94dc152d2c.zip
[SPARK-15459][SQL] Make Range logical and physical explain consistent
## What changes were proposed in this pull request? This patch simplifies the implementation of Range operator and make the explain string consistent between logical plan and physical plan. To do this, I changed RangeExec to embed a Range logical plan in it. Before this patch (note that the logical Range and physical Range actually output different information): ``` == Optimized Logical Plan == Range 0, 100, 2, 2, [id#8L] == Physical Plan == *Range 0, 2, 2, 50, [id#8L] ``` After this patch: If step size is 1: ``` == Optimized Logical Plan == Range(0, 100, splits=2) == Physical Plan == *Range(0, 100, splits=2) ``` If step size is not 1: ``` == Optimized Logical Plan == Range (0, 100, step=2, splits=2) == Physical Plan == *Range (0, 100, step=2, splits=2) ``` ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #13239 from rxin/SPARK-15459.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala2
5 files changed, 37 insertions, 29 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index bed48b6f61..b1b3e00de1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -431,8 +431,11 @@ case class Range(
end: Long,
step: Long,
numSlices: Int,
- output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
- require(step != 0, "step cannot be 0")
+ output: Seq[Attribute])
+ extends LeafNode with MultiInstanceRelation {
+
+ require(step != 0, s"step ($step) cannot be 0")
+
val numElements: BigInt = {
val safeStart = BigInt(start)
val safeEnd = BigInt(end)
@@ -444,13 +447,20 @@ case class Range(
}
}
- override def newInstance(): Range =
- Range(start, end, step, numSlices, output.map(_.newInstance()))
+ override def newInstance(): Range = copy(output = output.map(_.newInstance()))
override def statistics: Statistics = {
val sizeInBytes = LongType.defaultSize * numElements
Statistics( sizeInBytes = sizeInBytes )
}
+
+ override def simpleString: String = {
+ if (step == 1) {
+ s"Range ($start, $end, splits=$numSlices)"
+ } else {
+ s"Range ($start, $end, step=$step, splits=$numSlices)"
+ }
+ }
}
case class Aggregate(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 91e2e077cf..a4dc03cd8b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -197,8 +197,8 @@ class SessionCatalogSuite extends SparkFunSuite {
test("create temp table") {
val catalog = new SessionCatalog(newBasicCatalog())
- val tempTable1 = Range(1, 10, 1, 10, Seq())
- val tempTable2 = Range(1, 20, 2, 10, Seq())
+ val tempTable1 = Range(1, 10, 1, 10)
+ val tempTable2 = Range(1, 20, 2, 10)
catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
assert(catalog.getTempTable("tbl1") == Option(tempTable1))
@@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("drop temp table") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
- val tempTable = Range(1, 10, 2, 10, Seq())
+ val tempTable = Range(1, 10, 2, 10)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
@@ -304,7 +304,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("rename temp table") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
- val tempTable = Range(1, 10, 2, 10, Seq())
+ val tempTable = Range(1, 10, 2, 10)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable))
@@ -383,7 +383,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("lookup table relation") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
- val tempTable1 = Range(1, 10, 1, 10, Seq())
+ val tempTable1 = Range(1, 10, 1, 10)
val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
@@ -422,7 +422,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
// If database is explicitly specified, do not check temporary tables
- val tempTable = Range(1, 10, 1, 10, Seq())
+ val tempTable = Range(1, 10, 1, 10)
catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
// If database is not explicitly specified, check the current database
@@ -434,7 +434,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("list tables without pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
- val tempTable = Range(1, 10, 2, 10, Seq())
+ val tempTable = Range(1, 10, 2, 10)
catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
assert(catalog.listTables("db1").toSet ==
@@ -451,7 +451,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("list tables with pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
- val tempTable = Range(1, 10, 2, 10, Seq())
+ val tempTable = Range(1, 10, 2, 10)
catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 664e7f5661..555a2f4c01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -360,8 +360,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
- case r @ logical.Range(start, end, step, numSlices, output) =>
- execution.RangeExec(start, step, numSlices, r.numElements, output) :: Nil
+ case r : logical.Range =>
+ execution.RangeExec(r) :: Nil
case logical.RepartitionByExpression(expressions, child, nPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index d492fa7c41..89bde6ad73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.LongType
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
/** Physical plan for Project. */
@@ -305,22 +305,18 @@ case class SampleExec(
/**
- * Physical plan for range (generating a range of 64 bit numbers.
- *
- * @param start first number in the range, inclusive.
- * @param step size of the step increment.
- * @param numSlices number of partitions.
- * @param numElements total number of elements to output.
- * @param output output attributes.
+ * Physical plan for range (generating a range of 64 bit numbers).
*/
-case class RangeExec(
- start: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- output: Seq[Attribute])
+case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
extends LeafExecNode with CodegenSupport {
+ def start: Long = range.start
+ def step: Long = range.step
+ def numSlices: Int = range.numSlices
+ def numElements: BigInt = range.numElements
+
+ override val output: Seq[Attribute] = range.output
+
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
@@ -458,6 +454,8 @@ case class RangeExec(
}
}
}
+
+ override def simpleString: String = range.simpleString
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index e4d4cecd5b..cd434f7887 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -58,7 +58,7 @@ class CatalogSuite
}
private def createTempTable(name: String): Unit = {
- sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true)
+ sessionCatalog.createTempView(name, Range(1, 2, 3, 4), overrideIfExists = true)
}
private def dropTable(name: String, db: Option[String] = None): Unit = {