aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-01-01 13:39:20 -0800
committerReynold Xin <rxin@databricks.com>2016-01-01 13:39:20 -0800
commit0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e (patch)
tree831829e112b108c0da73492a351fe04c8f67f827
parent6c20b3c0871609cbbb1de8e12edd3cad318fc14e (diff)
downloadspark-0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.tar.gz
spark-0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.tar.bz2
spark-0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e.zip
[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow
It's confusing that some operator output UnsafeRow but some not, easy to make mistake. This PR change to only output UnsafeRow for all the operators (SparkPlan), removed the rule to insert Unsafe/Safe conversions. For those that can't output UnsafeRow directly, added UnsafeProjection into them. Closes #10330 cc JoshRosen rxin Author: Davies Liu <davies@databricks.com> Closes #10511 from davies/unsafe_row.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala58
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala108
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala54
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala164
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala31
34 files changed, 74 insertions, 574 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index eadf5cba6d..022303239f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -904,8 +904,7 @@ class SQLContext private[sql](
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches = Seq(
- Batch("Add exchange", Once, EnsureRequirements(self)),
- Batch("Add row converters", Once, EnsureRowFormats)
+ Batch("Add exchange", Once, EnsureRequirements(self))
)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 62cbc518e0..7b4161930b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
@@ -50,26 +49,14 @@ case class Exchange(
case None => ""
}
- val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
+ val simpleNodeName = "Exchange"
s"$simpleNodeName$extraInfo"
}
- /**
- * Returns true iff we can support the data type, and we are not doing range partitioning.
- */
- private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]
-
override def outputPartitioning: Partitioning = newPartitioning
override def output: Seq[Attribute] = child.output
- // This setting is somewhat counterintuitive:
- // If the schema works with UnsafeRow, then we tell the planner that we don't support safe row,
- // so the planner inserts a converter to convert data into UnsafeRow if needed.
- override def outputsUnsafeRows: Boolean = tungstenMode
- override def canProcessSafeRows: Boolean = !tungstenMode
- override def canProcessUnsafeRows: Boolean = tungstenMode
-
/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
@@ -130,15 +117,7 @@ case class Exchange(
}
}
- @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
-
- private val serializer: Serializer = {
- if (tungstenMode) {
- new UnsafeRowSerializer(child.output.size)
- } else {
- new SparkSqlSerializer(sparkConf)
- }
- }
+ private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
override protected def doPrepare(): Unit = {
// If an ExchangeCoordinator is needed, we register this Exchange operator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 5c01af011d..fc508bfafa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, AttributeSet, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
@@ -99,10 +99,19 @@ private[sql] case class PhysicalRDD(
rdd: RDD[InternalRow],
override val nodeName: String,
override val metadata: Map[String, String] = Map.empty,
- override val outputsUnsafeRows: Boolean = false)
+ isUnsafeRow: Boolean = false)
extends LeafNode {
- protected override def doExecute(): RDD[InternalRow] = rdd
+ protected override def doExecute(): RDD[InternalRow] = {
+ if (isUnsafeRow) {
+ rdd
+ } else {
+ rdd.mapPartitionsInternal { iter =>
+ val proj = UnsafeProjection.create(schema)
+ iter.map(proj)
+ }
+ }
+ }
override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 91530bd637..c3683cc4e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -41,20 +41,11 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
- override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
-
override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))
- private[this] val projection = {
- if (outputsUnsafeRows) {
- (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
- } else {
- (exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
- }
- }
+ private[this] val projection =
+ (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 0c613e91b9..4db88a09d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -64,6 +64,7 @@ case class Generate(
child.execute().mapPartitionsInternal { iter =>
val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
val joinedRow = new JoinedRow
+ val proj = UnsafeProjection.create(output, output)
iter.flatMap { row =>
// we should always set the left (child output)
@@ -77,13 +78,14 @@ case class Generate(
} ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
// we leave the left side as the last element of its child output
// keep it the same as Hive does
- joinedRow.withRight(row)
+ proj(joinedRow.withRight(row))
}
}
} else {
child.execute().mapPartitionsInternal { iter =>
- iter.flatMap(row => boundGenerator.eval(row)) ++
- LazyIterator(() => boundGenerator.terminate())
+ val proj = UnsafeProjection.create(output, output)
+ (iter.flatMap(row => boundGenerator.eval(row)) ++
+ LazyIterator(() => boundGenerator.terminate())).map(proj)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index ba7f6287ac..59057bf966 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
/**
@@ -29,15 +29,20 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {
- private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
+ private val unsafeRows: Array[InternalRow] = {
+ val proj = UnsafeProjection.create(output, output)
+ rows.map(r => proj(r).copy()).toArray
+ }
+
+ private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
protected override def doExecute(): RDD[InternalRow] = rdd
override def executeCollect(): Array[InternalRow] = {
- rows.toArray
+ unsafeRows
}
override def executeTake(limit: Int): Array[InternalRow] = {
- rows.take(limit).toArray
+ unsafeRows.take(limit)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 24207cb46f..73dc8cb984 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -39,10 +39,6 @@ case class Sort(
testSpillFrequency: Int = 0)
extends UnaryNode {
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = false
-
override def output: Seq[Attribute] = child.output
override def outputOrdering: Seq[SortOrder] = sortOrder
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index fe9b2ad4a0..f20f32aace 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -97,17 +97,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
- /** Specifies whether this operator outputs UnsafeRows */
- def outputsUnsafeRows: Boolean = false
-
- /** Specifies whether this operator is capable of processing UnsafeRows */
- def canProcessUnsafeRows: Boolean = false
-
- /**
- * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
- * that are not UnsafeRows).
- */
- def canProcessSafeRows: Boolean = true
/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
@@ -115,18 +104,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override doExecute instead.
*/
final def execute(): RDD[InternalRow] = {
- if (children.nonEmpty) {
- val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
- val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
- assert(!(hasSafeInputs && hasUnsafeInputs),
- "Child operators should output rows in the same format")
- assert(canProcessSafeRows || canProcessUnsafeRows,
- "Operator must be able to process at least one row format")
- assert(!hasSafeInputs || canProcessSafeRows,
- "Operator will receive safe rows as input but cannot process safe rows")
- assert(!hasUnsafeInputs || canProcessUnsafeRows,
- "Operator will receive unsafe rows as input but cannot process unsafe rows")
- }
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
doExecute()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index c941d673c7..b79d93d7ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -100,8 +100,6 @@ case class Window(
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
- override def canProcessUnsafeRows: Boolean = true
-
/**
* Create a bound ordering object for a given frame type and offset. A bound ordering object is
* used to determine which input row lies within the frame boundaries of an output row.
@@ -259,16 +257,16 @@ case class Window(
* @return the final resulting projection.
*/
private[this] def createResultProjection(
- expressions: Seq[Expression]): MutableProjection = {
+ expressions: Seq[Expression]): UnsafeProjection = {
val references = expressions.zipWithIndex.map{ case (e, i) =>
// Results of window expressions will be on the right side of child's output
BoundReference(child.output.size + i, e.dataType, e.nullable)
}
val unboundToRefMap = expressions.zip(references).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
- newMutableProjection(
+ UnsafeProjection.create(
projectList ++ patchedWindowExpression,
- child.output)()
+ child.output)
}
protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index c4587ba677..01d076678f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -49,10 +49,6 @@ case class SortBasedAggregate(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = false
- override def canProcessSafeRows: Boolean = true
-
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
override def requiredChildDistribution: List[Distribution] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index ac920aa8bc..6501634ff9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -87,6 +87,10 @@ class SortBasedAggregationIterator(
// The aggregation buffer used by the sort-based aggregation.
private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
+ // An SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
+ // compared to MutableRow (aggregation buffer) directly.
+ private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType))
+
protected def initialize(): Unit = {
if (inputIterator.hasNext) {
initializeBuffer(sortBasedAggregationBuffer)
@@ -110,7 +114,7 @@ class SortBasedAggregationIterator(
// We create a variable to track if we see the next group.
var findNextPartition = false
// firstRowInNextGroup is the first row of this group. We first process it.
- processRow(sortBasedAggregationBuffer, firstRowInNextGroup)
+ processRow(sortBasedAggregationBuffer, safeProj(firstRowInNextGroup))
// The search will stop when we see the next group or there is no
// input row left in the iter.
@@ -122,7 +126,7 @@ class SortBasedAggregationIterator(
// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
- processRow(sortBasedAggregationBuffer, currentRow)
+ processRow(sortBasedAggregationBuffer, safeProj(currentRow))
} else {
// We find a new group.
findNextPartition = true
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 9d758eb3b7..999ebb768a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -49,10 +49,6 @@ case class TungstenAggregate(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
-
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
override def producedAttributes: AttributeSet =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index f19d72f067..af7237ef25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -36,10 +36,6 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
override private[sql] lazy val metrics = Map(
"numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
-
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
protected override def doExecute(): RDD[InternalRow] = {
@@ -80,12 +76,6 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
- override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-
- override def canProcessUnsafeRows: Boolean = true
-
- override def canProcessSafeRows: Boolean = true
}
/**
@@ -108,10 +98,6 @@ case class Sample(
{
override def output: Seq[Attribute] = child.output
- override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
-
protected override def doExecute(): RDD[InternalRow] = {
if (withReplacement) {
// Disable gap sampling since the gap sampling method buffers two rows internally,
@@ -135,8 +121,6 @@ case class Range(
output: Seq[Attribute])
extends LeafNode {
- override def outputsUnsafeRows: Boolean = true
-
protected override def doExecute(): RDD[InternalRow] = {
sqlContext
.sparkContext
@@ -199,9 +183,6 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
}
}
}
- override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
protected override def doExecute(): RDD[InternalRow] =
sparkContext.union(children.map(_.execute()))
}
@@ -268,12 +249,14 @@ case class TakeOrderedAndProject(
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
- // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
- @transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
-
private def collectData(): Array[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
- projection.map(data.map(_)).getOrElse(data)
+ if (projectList.isDefined) {
+ val proj = UnsafeProjection.create(projectList.get, child.output)
+ data.map(r => proj(r).copy())
+ } else {
+ data
+ }
}
override def executeCollect(): Array[InternalRow] = {
@@ -311,10 +294,6 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
protected override def doExecute(): RDD[InternalRow] = {
child.execute().coalesce(numPartitions, shuffle = false)
}
-
- override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
}
/**
@@ -327,10 +306,6 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
}
-
- override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
}
/**
@@ -343,10 +318,6 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
-
- override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
}
/**
@@ -371,10 +342,6 @@ case class MapPartitions[T, U](
child: SparkPlan) extends UnaryNode {
override def producedAttributes: AttributeSet = outputSet
- override def canProcessSafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def outputsUnsafeRows: Boolean = true
-
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter =>
val tBoundEncoder = tEncoder.bind(child.output)
@@ -394,11 +361,6 @@ case class AppendColumns[T, U](
child: SparkPlan) extends UnaryNode {
override def producedAttributes: AttributeSet = AttributeSet(newColumns)
- // We are using an unsafe combiner.
- override def canProcessSafeRows: Boolean = false
- override def canProcessUnsafeRows: Boolean = true
- override def outputsUnsafeRows: Boolean = true
-
override def output: Seq[Attribute] = child.output ++ newColumns
override protected def doExecute(): RDD[InternalRow] = {
@@ -428,10 +390,6 @@ case class MapGroups[K, T, U](
child: SparkPlan) extends UnaryNode {
override def producedAttributes: AttributeSet = outputSet
- override def canProcessSafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def outputsUnsafeRows: Boolean = true
-
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil
@@ -472,10 +430,6 @@ case class CoGroup[Key, Left, Right, Result](
right: SparkPlan) extends BinaryNode {
override def producedAttributes: AttributeSet = outputSet
- override def canProcessSafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def outputsUnsafeRows: Boolean = true
-
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index aa7a668e0e..d80912309b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan}
+import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Accumulable, Accumulator, Accumulators}
@@ -39,9 +39,7 @@ private[sql] object InMemoryRelation {
storageLevel: StorageLevel,
child: SparkPlan,
tableName: Option[String]): InMemoryRelation =
- new InMemoryRelation(child.output, useCompression, batchSize, storageLevel,
- if (child.outputsUnsafeRows) child else ConvertToUnsafe(child),
- tableName)()
+ new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
}
/**
@@ -226,8 +224,6 @@ private[sql] case class InMemoryColumnarTableScan(
// The cached version does not change the outputOrdering of the original SparkPlan.
override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
- override def outputsUnsafeRows: Boolean = true
-
private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
// Returned filter predicate should return false iff it is impossible for the input expression
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index aab177b2e8..54275c2cc1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -46,15 +46,8 @@ case class BroadcastNestedLoopJoin(
case BuildLeft => (right, left)
}
- override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || right.outputsUnsafeRows
- override def canProcessUnsafeRows: Boolean = true
-
private[this] def genResultProjection: InternalRow => InternalRow = {
- if (outputsUnsafeRows) {
UnsafeProjection.create(schema)
- } else {
- identity[InternalRow]
- }
}
override def outputPartitioning: Partitioning = streamed.outputPartitioning
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 81bfe4e67c..d9fa4c6b83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -81,10 +81,6 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
- override def canProcessSafeRows: Boolean = false
- override def canProcessUnsafeRows: Boolean = true
- override def outputsUnsafeRows: Boolean = true
-
override private[sql] lazy val metrics = Map(
"numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
"numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index fb961d97c3..7f9d9daa5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -44,10 +44,6 @@ trait HashJoin {
override def output: Seq[Attribute] = left.output ++ right.output
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = false
-
protected def buildSideKeyGenerator: Projection =
UnsafeProjection.create(buildKeys, buildPlan.output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index c6e5868187..6d464d6946 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -64,10 +64,6 @@ trait HashOuterJoin {
s"HashOuterJoin should not take $x as the JoinType")
}
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = false
-
protected def buildKeyGenerator: Projection =
UnsafeProjection.create(buildKeys, buildPlan.output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index f23a1830e9..3e0f74cd98 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -33,10 +33,6 @@ trait HashSemiJoin {
override def output: Seq[Attribute] = left.output
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = false
-
protected def leftKeyGenerator: Projection =
UnsafeProjection.create(leftKeys, left.output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index efa7b49410..82498ee395 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -42,9 +42,6 @@ case class LeftSemiJoinBNL(
override def output: Seq[Attribute] = left.output
- override def outputsUnsafeRows: Boolean = streamed.outputsUnsafeRows
- override def canProcessUnsafeRows: Boolean = true
-
/** The Streamed Relation */
override def left: SparkPlan = streamed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 4bf7b521c7..812f881d06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -53,10 +53,6 @@ case class SortMergeJoin(
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = false
-
private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
// This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`.
keys.map(SortOrder(_, Ascending))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 7ce38ebdb3..c3a2bfc59c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -89,10 +89,6 @@ case class SortMergeOuterJoin(
keys.map(SortOrder(_, Ascending))
}
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = false
-
private def createLeftKeyGenerator(): Projection =
UnsafeProjection.create(leftKeys, left.output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index 6a882c9234..e46217050b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -69,18 +69,6 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
*/
def close(): Unit
- /** Specifies whether this operator outputs UnsafeRows */
- def outputsUnsafeRows: Boolean = false
-
- /** Specifies whether this operator is capable of processing UnsafeRows */
- def canProcessUnsafeRows: Boolean = false
-
- /**
- * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
- * that are not UnsafeRows).
- */
- def canProcessSafeRows: Boolean = true
-
/**
* Returns the content through the [[Iterator]] interface.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
index 7321fc66b4..b7fa0c0202 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
@@ -47,11 +47,7 @@ case class NestedLoopJoinNode(
}
private[this] def genResultProjection: InternalRow => InternalRow = {
- if (outputsUnsafeRows) {
- UnsafeProjection.create(schema)
- } else {
- identity[InternalRow]
- }
+ UnsafeProjection.create(schema)
}
private[this] var currentRow: InternalRow = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index defcec95fb..efb4b09c16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -351,10 +351,6 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
def children: Seq[SparkPlan] = child :: Nil
- override def outputsUnsafeRows: Boolean = false
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = true
-
protected override def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute().map(_.copy())
val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
@@ -400,13 +396,14 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
val unpickle = new Unpickler
val row = new GenericMutableRow(1)
val joined = new JoinedRow
+ val resultProj = UnsafeProjection.create(output, output)
outputIterator.flatMap { pickedResult =>
val unpickledBatch = unpickle.loads(pickedResult)
unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
}.map { result =>
row(0) = EvaluatePython.fromJava(result, udf.dataType)
- joined(queue.poll(), row)
+ resultProj(joined(queue.poll(), row))
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
deleted file mode 100644
index 5f8fc2de8b..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.rules.Rule
-
-/**
- * Converts Java-object-based rows into [[UnsafeRow]]s.
- */
-case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode {
-
- override def output: Seq[Attribute] = child.output
- override def outputPartitioning: Partitioning = child.outputPartitioning
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
- override def outputsUnsafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = false
- override def canProcessSafeRows: Boolean = true
- override protected def doExecute(): RDD[InternalRow] = {
- child.execute().mapPartitions { iter =>
- val convertToUnsafe = UnsafeProjection.create(child.schema)
- iter.map(convertToUnsafe)
- }
- }
-}
-
-/**
- * Converts [[UnsafeRow]]s back into Java-object-based rows.
- */
-case class ConvertToSafe(child: SparkPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
- override def outputPartitioning: Partitioning = child.outputPartitioning
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
- override def outputsUnsafeRows: Boolean = false
- override def canProcessUnsafeRows: Boolean = true
- override def canProcessSafeRows: Boolean = false
- override protected def doExecute(): RDD[InternalRow] = {
- child.execute().mapPartitions { iter =>
- val convertToSafe = FromUnsafeProjection(child.output.map(_.dataType))
- iter.map(convertToSafe)
- }
- }
-}
-
-private[sql] object EnsureRowFormats extends Rule[SparkPlan] {
-
- private def onlyHandlesSafeRows(operator: SparkPlan): Boolean =
- operator.canProcessSafeRows && !operator.canProcessUnsafeRows
-
- private def onlyHandlesUnsafeRows(operator: SparkPlan): Boolean =
- operator.canProcessUnsafeRows && !operator.canProcessSafeRows
-
- private def handlesBothSafeAndUnsafeRows(operator: SparkPlan): Boolean =
- operator.canProcessSafeRows && operator.canProcessUnsafeRows
-
- override def apply(operator: SparkPlan): SparkPlan = operator.transformUp {
- case operator: SparkPlan if onlyHandlesSafeRows(operator) =>
- if (operator.children.exists(_.outputsUnsafeRows)) {
- operator.withNewChildren {
- operator.children.map {
- c => if (c.outputsUnsafeRows) ConvertToSafe(c) else c
- }
- }
- } else {
- operator
- }
- case operator: SparkPlan if onlyHandlesUnsafeRows(operator) =>
- if (operator.children.exists(!_.outputsUnsafeRows)) {
- operator.withNewChildren {
- operator.children.map {
- c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c
- }
- }
- } else {
- operator
- }
- case operator: SparkPlan if handlesBothSafeAndUnsafeRows(operator) =>
- if (operator.children.map(_.outputsUnsafeRows).toSet.size != 1) {
- // If this operator's children produce both unsafe and safe rows,
- // convert everything unsafe rows.
- operator.withNewChildren {
- operator.children.map {
- c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c
- }
- }
- } else {
- operator
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
index 911d12e93e..87bff3295f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
@@ -28,7 +28,7 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
val input = (1 to 1000).map(Tuple1.apply)
checkAnswer(
input.toDF(),
- plan => ConvertToSafe(Exchange(SinglePartition, ConvertToUnsafe(plan))),
+ plan => Exchange(SinglePartition, plan),
input.map(Row.fromTuple)
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
deleted file mode 100644
index faef76d52a..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Alias, Literal}
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.IntegerType
-
-class ExpandSuite extends SparkPlanTest with SharedSQLContext {
- import testImplicits.localSeqToDataFrameHolder
-
- private def testExpand(f: SparkPlan => SparkPlan): Unit = {
- val input = (1 to 1000).map(Tuple1.apply)
- val projections = Seq.tabulate(2) { i =>
- Alias(BoundReference(0, IntegerType, false), "id")() :: Alias(Literal(i), "gid")() :: Nil
- }
- val attributes = projections.head.map(_.toAttribute)
- checkAnswer(
- input.toDF(),
- plan => Expand(projections, attributes, f(plan)),
- input.flatMap(i => Seq.tabulate(2)(j => Row(i._1, j)))
- )
- }
-
- test("inheriting child row type") {
- val exprs = AttributeReference("a", IntegerType, false)() :: Nil
- val plan = Expand(Seq(exprs), exprs, ConvertToUnsafe(LocalTableScan(exprs, Seq.empty)))
- assert(plan.outputsUnsafeRows, "Expand should inherits the created row type from its child.")
- }
-
- test("expanding UnsafeRows") {
- testExpand(ConvertToUnsafe)
- }
-
- test("expanding SafeRows") {
- testExpand(identity)
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
deleted file mode 100644
index 2328899bb2..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SQLContext, Row}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, Literal, IsNull}
-import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{ArrayType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
-
-class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
-
- private def getConverters(plan: SparkPlan): Seq[SparkPlan] = plan.collect {
- case c: ConvertToUnsafe => c
- case c: ConvertToSafe => c
- }
-
- private val outputsSafe = ReferenceSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
- assert(!outputsSafe.outputsUnsafeRows)
- private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
- assert(outputsUnsafe.outputsUnsafeRows)
-
- test("planner should insert unsafe->safe conversions when required") {
- val plan = Limit(10, outputsUnsafe)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
- }
-
- test("filter can process unsafe rows") {
- val plan = Filter(IsNull(IsNull(Literal(1))), outputsUnsafe)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(getConverters(preparedPlan).size === 1)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("filter can process safe rows") {
- val plan = Filter(IsNull(IsNull(Literal(1))), outputsSafe)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(getConverters(preparedPlan).isEmpty)
- assert(!preparedPlan.outputsUnsafeRows)
- }
-
- test("coalesce can process unsafe rows") {
- val plan = Coalesce(1, outputsUnsafe)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(getConverters(preparedPlan).size === 1)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("except can process unsafe rows") {
- val plan = Except(outputsUnsafe, outputsUnsafe)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(getConverters(preparedPlan).size === 2)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("except requires all of its input rows' formats to agree") {
- val plan = Except(outputsSafe, outputsUnsafe)
- assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("intersect can process unsafe rows") {
- val plan = Intersect(outputsUnsafe, outputsUnsafe)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(getConverters(preparedPlan).size === 2)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("intersect requires all of its input rows' formats to agree") {
- val plan = Intersect(outputsSafe, outputsUnsafe)
- assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("execute() fails an assertion if inputs rows are of different formats") {
- val e = intercept[AssertionError] {
- Union(Seq(outputsSafe, outputsUnsafe)).execute()
- }
- assert(e.getMessage.contains("format"))
- }
-
- test("union requires all of its input rows' formats to agree") {
- val plan = Union(Seq(outputsSafe, outputsUnsafe))
- assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("union can process safe rows") {
- val plan = Union(Seq(outputsSafe, outputsSafe))
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(!preparedPlan.outputsUnsafeRows)
- }
-
- test("union can process unsafe rows") {
- val plan = Union(Seq(outputsUnsafe, outputsUnsafe))
- val preparedPlan = sqlContext.prepareForExecution.execute(plan)
- assert(preparedPlan.outputsUnsafeRows)
- }
-
- test("round trip with ConvertToUnsafe and ConvertToSafe") {
- val input = Seq(("hello", 1), ("world", 2))
- checkAnswer(
- sqlContext.createDataFrame(input),
- plan => ConvertToSafe(ConvertToUnsafe(plan)),
- input.map(Row.fromTuple)
- )
- }
-
- test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
- SQLContext.setActive(sqlContext)
- val schema = ArrayType(StringType)
- val rows = (1 to 100).map { i =>
- InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))
- }
- val relation = LocalTableScan(Seq(AttributeReference("t", schema)()), rows)
-
- val plan =
- DummyPlan(
- ConvertToSafe(
- ConvertToUnsafe(relation)))
- assert(plan.execute().collect().map(_.getUTF8String(0).toString) === (1 to 100).map(_.toString))
- }
-}
-
-case class DummyPlan(child: SparkPlan) extends UnaryNode {
-
- override protected def doExecute(): RDD[InternalRow] = {
- child.execute().mapPartitions { iter =>
- // This `DummyPlan` is in safe mode, so we don't need to do copy even we hold some
- // values gotten from the incoming rows.
- // we cache all strings here to make sure we have deep copied UTF8String inside incoming
- // safe InternalRow.
- val strings = new scala.collection.mutable.ArrayBuffer[UTF8String]
- iter.foreach { row =>
- strings += row.getArray(0).getUTF8String(0)
- }
- strings.map(InternalRow(_)).iterator
- }
- }
-
- override def output: Seq[Attribute] = Seq(AttributeReference("a", StringType)())
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index e5d34be4c6..af971dfc6f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -99,7 +99,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
)
checkThatPlansAgree(
inputDf,
- p => ConvertToSafe(Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23)),
+ p => Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23),
ReferenceSort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 8141136de5..1588728bdb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -132,11 +132,17 @@ case class HiveTableScan(
}
}
- protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
- hadoopReader.makeRDDForTable(relation.hiveQlTable)
- } else {
- hadoopReader.makeRDDForPartitionedTable(
- prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ protected override def doExecute(): RDD[InternalRow] = {
+ val rdd = if (!relation.hiveQlTable.isPartitioned) {
+ hadoopReader.makeRDDForTable(relation.hiveQlTable)
+ } else {
+ hadoopReader.makeRDDForPartitionedTable(
+ prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ }
+ rdd.mapPartitionsInternal { iter =>
+ val proj = UnsafeProjection.create(schema)
+ iter.map(proj)
+ }
}
override def output: Seq[Attribute] = attributes
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f936cf565b..44dc68e6ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,18 +28,17 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.catalyst.expressions.{FromUnsafeProjection, Attribute}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.DataType
-import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.{SparkException, TaskContext}
private[hive]
case class InsertIntoHiveTable(
@@ -101,15 +100,17 @@ case class InsertIntoHiveTable(
writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
+ val proj = FromUnsafeProjection(child.schema)
iterator.foreach { row =>
var i = 0
+ val safeRow = proj(row)
while (i < fieldOIs.length) {
- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+ outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
i += 1
}
writerContainer
- .getLocalFileWriter(row, table.schema)
+ .getLocalFileWriter(safeRow, table.schema)
.write(serializer.serialize(outputData, standardOI))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index a61e162f48..6ccd417819 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -213,7 +213,8 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
if (iter.hasNext) {
- processIterator(iter)
+ val proj = UnsafeProjection.create(schema)
+ processIterator(iter).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 665e87e3e3..efbf9988dd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -689,36 +688,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
-
- test("HadoopFsRelation produces UnsafeRow") {
- withTempTable("test_unsafe") {
- withTempPath { dir =>
- val path = dir.getCanonicalPath
- sqlContext.range(3).write.format(dataSourceName).save(path)
- sqlContext.read
- .format(dataSourceName)
- .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
- .load(path)
- .registerTempTable("test_unsafe")
-
- val df = sqlContext.sql(
- """SELECT COUNT(*)
- |FROM test_unsafe a JOIN test_unsafe b
- |WHERE a.id = b.id
- """.stripMargin)
-
- val plan = df.queryExecution.executedPlan
-
- assert(
- plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
- s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
- |$plan
- """.stripMargin)
-
- checkAnswer(df, Row(3))
- }
- }
- }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when