aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 009fbaa006..ba61940b3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.execution.columnar
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import org.apache.commons.lang.StringUtils
-import org.apache.spark.{Accumulable, Accumulator}
+import org.apache.spark.Accumulator
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.{AccumulatorContext, ListAccumulator}
private[sql] object InMemoryRelation {
@@ -67,14 +67,14 @@ private[sql] case class InMemoryRelation(
tableName: Option[String])(
@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient private[sql] var _statistics: Statistics = null,
- private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
+ private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
extends logical.LeafNode with MultiInstanceRelation {
override def producedAttributes: AttributeSet = outputSet
- private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
+ private[sql] val batchStats: ListAccumulator[InternalRow] =
if (_batchStats == null) {
- child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
+ child.sqlContext.sparkContext.listAccumulator[InternalRow]
} else {
_batchStats
}
@@ -87,7 +87,7 @@ private[sql] case class InMemoryRelation(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)
- batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
+ batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
}
// Statistics propagation contracts:
@@ -169,7 +169,7 @@ private[sql] case class InMemoryRelation(
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))
- batchStats += stats
+ batchStats.add(stats)
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)