diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-31 17:26:26 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-31 17:26:26 -0800 |
commit | 8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a (patch) | |
tree | 7596fccda6cba57b7fc8a716a0e8f48827c0e789 /core/src/main | |
parent | 53d8d36684b16ae536a5e065e690bb21b9aadc49 (diff) | |
parent | 375d11743cda0781e9fb929920851ebef424dcf6 (diff) | |
download | spark-8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a.tar.gz spark-8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a.tar.bz2 spark-8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a.zip |
Merge branch 'master' of github.com:andrewor14/incubator-spark
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala | 83 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala | 71 |
2 files changed, 64 insertions, 90 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala b/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala deleted file mode 100644 index 3eb80661cb..0000000000 --- a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import org.apache.spark.util.SamplingSizeTracker.Sample - -/** - * Estimates the size of an object as it grows, in bytes. - * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, - * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds). - * - * Users should call updateMade() every time their object is updated with new data, or - * flushSamples() if there is a non-linear change in object size (otherwise linear is assumed). - * Not threadsafe. - */ -private[spark] class SamplingSizeTracker(obj: AnyRef) { - /** - * Controls the base of the exponential which governs the rate of sampling. - * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. - */ - private val SAMPLE_GROWTH_RATE = 1.1 - - private var lastLastSample: Sample = _ - private var lastSample: Sample = _ - - private var numUpdates: Long = _ - private var nextSampleNum: Long = _ - - flushSamples() - - /** Called after a non-linear change in the tracked object. Takes a new sample. */ - def flushSamples() { - numUpdates = 0 - nextSampleNum = 1 - // Throw out both prior samples to avoid overestimating delta. - lastSample = Sample(SizeEstimator.estimate(obj), 0) - lastLastSample = lastSample - } - - /** To be called after an update to the tracked object. Amortized O(1) time. */ - def updateMade() { - numUpdates += 1 - if (nextSampleNum == numUpdates) { - lastLastSample = lastSample - lastSample = Sample(SizeEstimator.estimate(obj), numUpdates) - nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong - } - } - - /** Estimates the current size of the tracked object. O(1) time. */ - def estimateSize(): Long = { - val interpolatedDelta = - if (lastLastSample != null && lastLastSample != lastSample) { - (lastSample.size - lastLastSample.size).toDouble / - (lastSample.numUpdates - lastLastSample.numUpdates) - } else if (lastSample.numUpdates > 0) { - lastSample.size.toDouble / lastSample.numUpdates - } else { - 0 - } - val extrapolatedDelta = math.max(0, interpolatedDelta * (numUpdates - lastSample.numUpdates)) - (lastSample.size + extrapolatedDelta).toLong - } -} - -object SamplingSizeTracker { - case class Sample(size: Long, numUpdates: Long) -} diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala index ea0f2fd68f..e6b6103d96 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala @@ -17,28 +17,85 @@ package org.apache.spark.util.collection -import org.apache.spark.util.SamplingSizeTracker +import scala.collection.mutable.ArrayBuffer -/** Append-only map that keeps track of its estimated size in bytes. */ +import org.apache.spark.util.SizeEstimator +import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample + +/** + * Append-only map that keeps track of its estimated size in bytes. + * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, + * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds). + */ private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] { - private val sizeTracker = new SamplingSizeTracker(this) + /** + * Controls the base of the exponential which governs the rate of sampling. + * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. + */ + private val SAMPLE_GROWTH_RATE = 1.1 + + /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */ + private val samples = new ArrayBuffer[Sample]() + + /** Total number of insertions and updates into the map since the last resetSamples(). */ + private var numUpdates: Long = _ + + /** The value of 'numUpdates' at which we will take our next sample. */ + private var nextSampleNum: Long = _ + + /** The average number of bytes per update between our last two samples. */ + private var bytesPerUpdate: Double = _ + + resetSamples() - def estimateSize() = sizeTracker.estimateSize() + /** Called after the map grows in size, as this can be a dramatic change for small objects. */ + def resetSamples() { + numUpdates = 1 + nextSampleNum = 1 + samples.clear() + takeSample() + } override def update(key: K, value: V): Unit = { super.update(key, value) - sizeTracker.updateMade() + numUpdates += 1 + if (nextSampleNum == numUpdates) { takeSample() } } override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { val newValue = super.changeValue(key, updateFunc) - sizeTracker.updateMade() + numUpdates += 1 + if (nextSampleNum == numUpdates) { takeSample() } newValue } + /** Takes a new sample of the current map's size. */ + def takeSample() { + samples += Sample(SizeEstimator.estimate(this), numUpdates) + // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change. + bytesPerUpdate = math.max(0, samples.toSeq.reverse match { + case latest :: previous :: tail => + (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates) + case _ => + 0 + }) + nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong + } + override protected def growTable() { super.growTable() - sizeTracker.flushSamples() + resetSamples() } + + /** Estimates the current size of the map in bytes. O(1) time. */ + def estimateSize(): Long = { + assert(samples.nonEmpty) + val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates) + (samples.last.size + extrapolatedDelta).toLong + } +} + +object SizeTrackingAppendOnlyMap { + case class Sample(size: Long, numUpdates: Long) } |