diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-12-30 23:07:29 -0800 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-12-30 23:39:02 -0800 |
commit | daa7792ad654e24012439db79c5a7f4abf149dc1 (patch) | |
tree | 694942f3c72c9d5a3a70c4ebd222e575b4b4ffc4 /core | |
parent | 347fafe4fccc9345ed0ffa6c7863bc233c079b43 (diff) | |
download | spark-daa7792ad654e24012439db79c5a7f4abf149dc1.tar.gz spark-daa7792ad654e24012439db79c5a7f4abf149dc1.tar.bz2 spark-daa7792ad654e24012439db79c5a7f4abf149dc1.zip |
Refactor SamplingSizeTracker into SizeTrackingAppendOnlyMap
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala | 83 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala | 71 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala) | 14 |
3 files changed, 71 insertions, 97 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala b/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala deleted file mode 100644 index 3eb80661cb..0000000000 --- a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import org.apache.spark.util.SamplingSizeTracker.Sample - -/** - * Estimates the size of an object as it grows, in bytes. - * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, - * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds). - * - * Users should call updateMade() every time their object is updated with new data, or - * flushSamples() if there is a non-linear change in object size (otherwise linear is assumed). - * Not threadsafe. - */ -private[spark] class SamplingSizeTracker(obj: AnyRef) { - /** - * Controls the base of the exponential which governs the rate of sampling. - * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. - */ - private val SAMPLE_GROWTH_RATE = 1.1 - - private var lastLastSample: Sample = _ - private var lastSample: Sample = _ - - private var numUpdates: Long = _ - private var nextSampleNum: Long = _ - - flushSamples() - - /** Called after a non-linear change in the tracked object. Takes a new sample. */ - def flushSamples() { - numUpdates = 0 - nextSampleNum = 1 - // Throw out both prior samples to avoid overestimating delta. - lastSample = Sample(SizeEstimator.estimate(obj), 0) - lastLastSample = lastSample - } - - /** To be called after an update to the tracked object. Amortized O(1) time. */ - def updateMade() { - numUpdates += 1 - if (nextSampleNum == numUpdates) { - lastLastSample = lastSample - lastSample = Sample(SizeEstimator.estimate(obj), numUpdates) - nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong - } - } - - /** Estimates the current size of the tracked object. O(1) time. */ - def estimateSize(): Long = { - val interpolatedDelta = - if (lastLastSample != null && lastLastSample != lastSample) { - (lastSample.size - lastLastSample.size).toDouble / - (lastSample.numUpdates - lastLastSample.numUpdates) - } else if (lastSample.numUpdates > 0) { - lastSample.size.toDouble / lastSample.numUpdates - } else { - 0 - } - val extrapolatedDelta = math.max(0, interpolatedDelta * (numUpdates - lastSample.numUpdates)) - (lastSample.size + extrapolatedDelta).toLong - } -} - -object SamplingSizeTracker { - case class Sample(size: Long, numUpdates: Long) -} diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala index ea0f2fd68f..e8401ab9d7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala @@ -17,28 +17,85 @@ package org.apache.spark.util.collection -import org.apache.spark.util.SamplingSizeTracker +import scala.collection.mutable.ArrayBuffer -/** Append-only map that keeps track of its estimated size in bytes. */ +import org.apache.spark.util.SizeEstimator +import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample + +/** + * Append-only map that keeps track of its estimated size in bytes. + * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, + * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds). + */ private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] { - private val sizeTracker = new SamplingSizeTracker(this) + /** + * Controls the base of the exponential which governs the rate of sampling. + * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. + */ + private val SAMPLE_GROWTH_RATE = 1.1 + + /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */ + private val samples = new ArrayBuffer[Sample]() + + /** Total number of insertions and updates into the map since the last resetSamples(). */ + private var numUpdates: Long = _ - def estimateSize() = sizeTracker.estimateSize() + /** The value of 'numUpdates' at which we will take our next sample. */ + private var nextSampleNum: Long = _ + + /** The average number of bytes per update between our last two samples. */ + private var bytesPerUpdate: Double = _ + + resetSamples() + + /** Called after the map grows in size, as this can be a dramatic change for small objects. */ + def resetSamples() { + numUpdates = 1 + nextSampleNum = 1 + samples.clear() + takeSample() + } override def update(key: K, value: V): Unit = { super.update(key, value) - sizeTracker.updateMade() + numUpdates += 1 + if (nextSampleNum == numUpdates) { takeSample() } } override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { val newValue = super.changeValue(key, updateFunc) - sizeTracker.updateMade() + numUpdates += 1 + if (nextSampleNum == numUpdates) { takeSample() } newValue } + /** Takes a new sample of the current map's size. */ + def takeSample() { + samples += Sample(SizeEstimator.estimate(this), numUpdates) + // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change. + bytesPerUpdate = math.max(0, samples.toSeq.reverse match { + case latest :: previous :: tail => + (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates) + case _ => + 0 + }) + nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong + } + override protected def growTable() { super.growTable() - sizeTracker.flushSamples() + resetSamples() + } + + /** Estimates the current size of the map in bytes. O(1) time. */ + def estimateSize(): Long = { + assert(samples.nonEmpty) + val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates) + (samples.last.size + extrapolatedDelta).toLong } } + +object SizeTrackingAppendOnlyMap { + case class Sample(size: Long, numUpdates: Long) +}
\ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala index 47e4723cf3..93f0c6a8e6 100644 --- a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala @@ -21,10 +21,10 @@ import scala.util.Random import org.scalatest.{BeforeAndAfterAll, FunSuite} -import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass +import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap} -class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll { +class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll { val NORMAL_ERROR = 0.20 val HIGH_ERROR = 0.30 @@ -70,24 +70,24 @@ class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll { } } -object SamplingSizeTrackerSuite { +object SizeTrackingAppendOnlyMapSuite { // Speed test, for reproducibility of results. // These could be highly non-deterministic in general, however. // Results: - // AppendOnlyMap: 30 ms - // SizeTracker: 45 ms + // AppendOnlyMap: 31 ms + // SizeTracker: 54 ms // SizeEstimator: 1500 ms def main(args: Array[String]) { val numElements = 100000 - val baseTimes = for (i <- 0 until 3) yield time { + val baseTimes = for (i <- 0 until 10) yield time { val map = new AppendOnlyMap[Int, LargeDummyClass]() for (i <- 0 until numElements) { map(i) = new LargeDummyClass() } } - val sampledTimes = for (i <- 0 until 3) yield time { + val sampledTimes = for (i <- 0 until 10) yield time { val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]() for (i <- 0 until numElements) { map(i) = new LargeDummyClass() |