aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala107
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala2
8 files changed, 99 insertions, 52 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 3662ed74d2..d363cb000d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -101,17 +101,76 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
- override val nodeName: String,
- override val metadata: Map[String, String] = Map.empty,
- isUnsafeRow: Boolean = false,
- override val outputPartitioning: Partitioning = UnknownPartitioning(0))
+ override val nodeName: String) extends LeafNode {
+
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ rdd.mapPartitionsInternal { iter =>
+ val proj = UnsafeProjection.create(schema)
+ iter.map { r =>
+ numOutputRows += 1
+ proj(r)
+ }
+ }
+ }
+
+ override def simpleString: String = {
+ s"Scan $nodeName${output.mkString("[", ",", "]")}"
+ }
+}
+
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class DataSourceScan(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ @transient relation: BaseRelation,
+ override val metadata: Map[String, String] = Map.empty)
extends LeafNode with CodegenSupport {
+ override val nodeName: String = relation.toString
+
+ // Ignore rdd when checking results
+ override def sameResult(plan: SparkPlan ): Boolean = plan match {
+ case other: DataSourceScan => relation == other.relation && metadata == other.metadata
+ case _ => false
+ }
+
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+ val outputUnsafeRows = relation match {
+ case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
+ !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+ case _: HadoopFsRelation => true
+ case _ => false
+ }
+
+ override val outputPartitioning = {
+ val bucketSpec = relation match {
+ // TODO: this should be closer to bucket planning.
+ case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
+ case _ => None
+ }
+
+ def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
+ throw new AnalysisException(s"bucket column $colName not found in existing columns " +
+ s"(${output.map(_.name).mkString(", ")})")
+ }
+
+ bucketSpec.map { spec =>
+ val numBuckets = spec.numBuckets
+ val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+ HashPartitioning(bucketColumns, numBuckets)
+ }.getOrElse {
+ UnknownPartitioning(0)
+ }
+ }
+
protected override def doExecute(): RDD[InternalRow] = {
- val unsafeRow = if (isUnsafeRow) {
+ val unsafeRow = if (outputUnsafeRows) {
rdd
} else {
rdd.mapPartitionsInternal { iter =>
@@ -187,7 +246,7 @@ private[sql] case class PhysicalRDD(
ctx.INPUT_ROW = row
ctx.currentVars = null
val columns2 = exprs.map(_.gen(ctx))
- val inputRow = if (isUnsafeRow) row else null
+ val inputRow = if (outputUnsafeRows) row else null
val scanRows = ctx.freshName("processRows")
ctx.addNewFunction(scanRows,
s"""
@@ -221,42 +280,8 @@ private[sql] case class PhysicalRDD(
}
}
-private[sql] object PhysicalRDD {
+private[sql] object DataSourceScan {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"
-
- def createFromDataSource(
- output: Seq[Attribute],
- rdd: RDD[InternalRow],
- relation: BaseRelation,
- metadata: Map[String, String] = Map.empty): PhysicalRDD = {
-
- val outputUnsafeRows = relation match {
- case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
- !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
- case _: HadoopFsRelation => true
- case _ => false
- }
-
- val bucketSpec = relation match {
- // TODO: this should be closer to bucket planning.
- case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
- case _ => None
- }
-
- def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
- throw new AnalysisException(s"bucket column $colName not found in existing columns " +
- s"(${output.map(_.name).mkString(", ")})")
- }
-
- bucketSpec.map { spec =>
- val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.map(toAttribute)
- val partitioning = HashPartitioning(bucketColumns, numBuckets)
- PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning)
- }.getOrElse {
- PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 52c2971b73..8fb4705581 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan {
case _: BroadcastHashJoin => "bhj"
case _: SortMergeJoin => "smj"
case _: PhysicalRDD => "rdd"
+ case _: DataSourceScan => "scan"
case _ => nodeName.toLowerCase
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2944a8f86f..1adf3b6676 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
+import org.apache.spark.sql.execution.DataSourceScan.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVectorUtils}
@@ -239,7 +239,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
- execution.PhysicalRDD.createFromDataSource(
+ execution.DataSourceScan(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -639,7 +639,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)
- val scan = execution.PhysicalRDD.createFromDataSource(
+ val scan = execution.DataSourceScan(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
@@ -649,7 +649,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
- val scan = execution.PhysicalRDD.createFromDataSource(
+ val scan = execution.DataSourceScan(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 94d318e702..8a36d32240 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph {
case "Subquery" if subgraph != null =>
// Subquery should not be included in WholeStageCodegen
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges)
+ case "ReusedExchange" =>
+ // Point to the re-used exchange
+ val node = exchanges(planInfo.children.head)
+ edges += SparkPlanGraphEdge(node.id, parent.id)
case name =>
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
@@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph {
} else {
subgraph.nodes += node
}
- if (name == "ShuffleExchange" || name == "BroadcastExchange") {
+ if (name.contains("Exchange")) {
exchanges += planInfo -> node
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 3780cbbcc9..9130e77ea5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -82,7 +82,24 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter
*
* @since 1.3.0
*/
-case class In(attribute: String, values: Array[Any]) extends Filter
+case class In(attribute: String, values: Array[Any]) extends Filter {
+ override def hashCode(): Int = {
+ var h = attribute.hashCode
+ values.foreach { v =>
+ h *= 41
+ h += v.hashCode()
+ }
+ h
+ }
+ override def equals(o: Any): Boolean = o match {
+ case In(a, vs) =>
+ a == attribute && vs.length == values.length && vs.zip(values).forall(x => x._1 == x._2)
+ case _ => false
+ }
+ override def toString: String = {
+ s"In($attribute, [${values.mkString(",")}]"
+ }
+}
/**
* A filter that evaluates to `true` iff the attribute evaluates to null.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index dfffa58212..1ef517324d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
@@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
- assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
- assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+ assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan])
+ assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation"))
df
}
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 2ff79a2316..19e34b45bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.PhysicalRDD => p
+ case p: execution.DataSourceScan => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index db72297537..62f991fc5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
try {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
- case p: execution.PhysicalRDD => p
+ case p: execution.DataSourceScan => p
} match {
case Seq(p) => p
case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")