diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-05-19 11:54:50 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-19 11:54:50 -0700 |
commit | f5065abf49dea0eac04b0ec219f2d832a0f6730a (patch) | |
tree | c9612c349133102561377d05956201050dbbb74c | |
parent | faafd1e9dbde1f9c2ad218b90c27c3f25f98e1a4 (diff) | |
download | spark-f5065abf49dea0eac04b0ec219f2d832a0f6730a.tar.gz spark-f5065abf49dea0eac04b0ec219f2d832a0f6730a.tar.bz2 spark-f5065abf49dea0eac04b0ec219f2d832a0f6730a.zip |
[SPARK-15322][SQL][FOLLOW-UP] Update deprecated accumulator usage into accumulatorV2
## What changes were proposed in this pull request?
This PR corrects another case that uses deprecated `accumulableCollection` to use `listAccumulator`, which seems the previous PR missed.
Since `ArrayBuffer[InternalRow].asJava` is `java.util.List[InternalRow]`, it seems ok to replace the usage.
## How was this patch tested?
Related existing tests `InMemoryColumnarQuerySuite` and `CachedTableSuite`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #13187 from HyukjinKwon/SPARK-15322.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 009fbaa006..ba61940b3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.execution.columnar -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import org.apache.commons.lang.StringUtils -import org.apache.spark.{Accumulable, Accumulator} +import org.apache.spark.Accumulator import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.AccumulatorContext +import org.apache.spark.util.{AccumulatorContext, ListAccumulator} private[sql] object InMemoryRelation { @@ -67,14 +67,14 @@ private[sql] case class InMemoryRelation( tableName: Option[String])( @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null, @transient private[sql] var _statistics: Statistics = null, - private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) + private[sql] var _batchStats: ListAccumulator[InternalRow] = null) extends logical.LeafNode with MultiInstanceRelation { override def producedAttributes: AttributeSet = outputSet - private[sql] val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = + private[sql] val batchStats: ListAccumulator[InternalRow] = if (_batchStats == null) { - child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) + child.sqlContext.sparkContext.listAccumulator[InternalRow] } else { _batchStats } @@ -87,7 +87,7 @@ private[sql] case class InMemoryRelation( output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), partitionStatistics.schema) - batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum + batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum } // Statistics propagation contracts: @@ -169,7 +169,7 @@ private[sql] case class InMemoryRelation( val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) .flatMap(_.values)) - batchStats += stats + batchStats.add(stats) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) |