aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYash Datta <Yash.Datta@guavus.com>2015-09-18 08:22:38 -0700
committerYin Huai <yhuai@databricks.com>2015-09-18 08:22:38 -0700
commit20fd35dfd1ac402b622604e7bbedcc53a580b0a2 (patch)
tree25523c5576a064b9033a2aca64636a4853b20508 /sql
parente3b5d6cb29e0f983fcc55920619e6433298955f5 (diff)
downloadspark-20fd35dfd1ac402b622604e7bbedcc53a580b0a2.tar.gz
spark-20fd35dfd1ac402b622604e7bbedcc53a580b0a2.tar.bz2
spark-20fd35dfd1ac402b622604e7bbedcc53a580b0a2.zip
[SPARK-10451] [SQL] Prevent unnecessary serializations in InMemoryColumnarTableScan
Many of the fields in InMemoryColumnar scan and InMemoryRelation can be made transient. This reduces my 1000ms job to abt 700 ms . The task size reduces from 2.8 mb to ~1300kb Author: Yash Datta <Yash.Datta@guavus.com> Closes #8604 from saucam/serde.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala35
1 files changed, 21 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 66d429bc06..d7e145f9c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -48,10 +48,10 @@ private[sql] case class InMemoryRelation(
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
- child: SparkPlan,
+ @transient child: SparkPlan,
tableName: Option[String])(
- private var _cachedColumnBuffers: RDD[CachedBatch] = null,
- private var _statistics: Statistics = null,
+ @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null,
+ @transient private var _statistics: Statistics = null,
private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
extends LogicalPlan with MultiInstanceRelation {
@@ -62,7 +62,7 @@ private[sql] case class InMemoryRelation(
_batchStats
}
- val partitionStatistics = new PartitionStatistics(output)
+ @transient val partitionStatistics = new PartitionStatistics(output)
private def computeSizeInBytes = {
val sizeOfRow: Expression =
@@ -196,7 +196,7 @@ private[sql] case class InMemoryRelation(
private[sql] case class InMemoryColumnarTableScan(
attributes: Seq[Attribute],
predicates: Seq[Expression],
- relation: InMemoryRelation)
+ @transient relation: InMemoryRelation)
extends LeafNode {
override def output: Seq[Attribute] = attributes
@@ -205,7 +205,7 @@ private[sql] case class InMemoryColumnarTableScan(
// Returned filter predicate should return false iff it is impossible for the input expression
// to evaluate to `true' based on statistics collected about this partition batch.
- val buildFilter: PartialFunction[Expression, Expression] = {
+ @transient val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
(buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
@@ -268,16 +268,23 @@ private[sql] case class InMemoryColumnarTableScan(
readBatches.setValue(0)
}
- relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator =>
- val partitionFilter = newPredicate(
- partitionFilters.reduceOption(And).getOrElse(Literal(true)),
- relation.partitionStatistics.schema)
+ // Using these variables here to avoid serialization of entire objects (if referenced directly)
+ // within the map Partitions closure.
+ val schema = relation.partitionStatistics.schema
+ val schemaIndex = schema.zipWithIndex
+ val relOutput = relation.output
+ val buffers = relation.cachedColumnBuffers
+
+ buffers.mapPartitions { cachedBatchIterator =>
+ val partitionFilter = newPredicate(
+ partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+ schema)
// Find the ordinals and data types of the requested columns. If none are requested, use the
// narrowest (the field with minimum default element size).
val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) {
val (narrowestOrdinal, narrowestDataType) =
- relation.output.zipWithIndex.map { case (a, ordinal) =>
+ relOutput.zipWithIndex.map { case (a, ordinal) =>
ordinal -> a.dataType
} minBy { case (_, dataType) =>
ColumnType(dataType).defaultSize
@@ -285,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan(
Seq(narrowestOrdinal) -> Seq(narrowestDataType)
} else {
attributes.map { a =>
- relation.output.indexWhere(_.exprId == a.exprId) -> a.dataType
+ relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
}.unzip
}
@@ -296,7 +303,7 @@ private[sql] case class InMemoryColumnarTableScan(
// Build column accessors
val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
ColumnAccessor(
- relation.output(batchColumnIndex).dataType,
+ relOutput(batchColumnIndex).dataType,
ByteBuffer.wrap(cachedBatch.buffers(batchColumnIndex)))
}
@@ -328,7 +335,7 @@ private[sql] case class InMemoryColumnarTableScan(
if (inMemoryPartitionPruningEnabled) {
cachedBatchIterator.filter { cachedBatch =>
if (!partitionFilter(cachedBatch.stats)) {
- def statsString: String = relation.partitionStatistics.schema.zipWithIndex.map {
+ def statsString: String = schemaIndex.map {
case (a, i) =>
val value = cachedBatch.stats.get(i, a.dataType)
s"${a.name}: $value"