aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-31 17:26:26 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-31 17:26:26 -0800
commit8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a (patch)
tree7596fccda6cba57b7fc8a716a0e8f48827c0e789 /core/src
parent53d8d36684b16ae536a5e065e690bb21b9aadc49 (diff)
parent375d11743cda0781e9fb929920851ebef424dcf6 (diff)
downloadspark-8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a.tar.gz
spark-8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a.tar.bz2
spark-8bbe08b21ee6e48b5ba1e2c2a8b1c7eacde9603a.zip
Merge branch 'master' of github.com:andrewor14/incubator-spark
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala71
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala)14
3 files changed, 71 insertions, 97 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala b/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala
deleted file mode 100644
index 3eb80661cb..0000000000
--- a/core/src/main/scala/org/apache/spark/util/SamplingSizeTracker.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import org.apache.spark.util.SamplingSizeTracker.Sample
-
-/**
- * Estimates the size of an object as it grows, in bytes.
- * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
- * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
- *
- * Users should call updateMade() every time their object is updated with new data, or
- * flushSamples() if there is a non-linear change in object size (otherwise linear is assumed).
- * Not threadsafe.
- */
-private[spark] class SamplingSizeTracker(obj: AnyRef) {
- /**
- * Controls the base of the exponential which governs the rate of sampling.
- * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
- */
- private val SAMPLE_GROWTH_RATE = 1.1
-
- private var lastLastSample: Sample = _
- private var lastSample: Sample = _
-
- private var numUpdates: Long = _
- private var nextSampleNum: Long = _
-
- flushSamples()
-
- /** Called after a non-linear change in the tracked object. Takes a new sample. */
- def flushSamples() {
- numUpdates = 0
- nextSampleNum = 1
- // Throw out both prior samples to avoid overestimating delta.
- lastSample = Sample(SizeEstimator.estimate(obj), 0)
- lastLastSample = lastSample
- }
-
- /** To be called after an update to the tracked object. Amortized O(1) time. */
- def updateMade() {
- numUpdates += 1
- if (nextSampleNum == numUpdates) {
- lastLastSample = lastSample
- lastSample = Sample(SizeEstimator.estimate(obj), numUpdates)
- nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
- }
- }
-
- /** Estimates the current size of the tracked object. O(1) time. */
- def estimateSize(): Long = {
- val interpolatedDelta =
- if (lastLastSample != null && lastLastSample != lastSample) {
- (lastSample.size - lastLastSample.size).toDouble /
- (lastSample.numUpdates - lastLastSample.numUpdates)
- } else if (lastSample.numUpdates > 0) {
- lastSample.size.toDouble / lastSample.numUpdates
- } else {
- 0
- }
- val extrapolatedDelta = math.max(0, interpolatedDelta * (numUpdates - lastSample.numUpdates))
- (lastSample.size + extrapolatedDelta).toLong
- }
-}
-
-object SamplingSizeTracker {
- case class Sample(size: Long, numUpdates: Long)
-}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index ea0f2fd68f..e6b6103d96 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -17,28 +17,85 @@
package org.apache.spark.util.collection
-import org.apache.spark.util.SamplingSizeTracker
+import scala.collection.mutable.ArrayBuffer
-/** Append-only map that keeps track of its estimated size in bytes. */
+import org.apache.spark.util.SizeEstimator
+import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
+
+/**
+ * Append-only map that keeps track of its estimated size in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
+ * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
+ */
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
- private val sizeTracker = new SamplingSizeTracker(this)
+ /**
+ * Controls the base of the exponential which governs the rate of sampling.
+ * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+ */
+ private val SAMPLE_GROWTH_RATE = 1.1
+
+ /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
+ private val samples = new ArrayBuffer[Sample]()
+
+ /** Total number of insertions and updates into the map since the last resetSamples(). */
+ private var numUpdates: Long = _
+
+ /** The value of 'numUpdates' at which we will take our next sample. */
+ private var nextSampleNum: Long = _
+
+ /** The average number of bytes per update between our last two samples. */
+ private var bytesPerUpdate: Double = _
+
+ resetSamples()
- def estimateSize() = sizeTracker.estimateSize()
+ /** Called after the map grows in size, as this can be a dramatic change for small objects. */
+ def resetSamples() {
+ numUpdates = 1
+ nextSampleNum = 1
+ samples.clear()
+ takeSample()
+ }
override def update(key: K, value: V): Unit = {
super.update(key, value)
- sizeTracker.updateMade()
+ numUpdates += 1
+ if (nextSampleNum == numUpdates) { takeSample() }
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
- sizeTracker.updateMade()
+ numUpdates += 1
+ if (nextSampleNum == numUpdates) { takeSample() }
newValue
}
+ /** Takes a new sample of the current map's size. */
+ def takeSample() {
+ samples += Sample(SizeEstimator.estimate(this), numUpdates)
+ // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
+ bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
+ case latest :: previous :: tail =>
+ (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
+ case _ =>
+ 0
+ })
+ nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+ }
+
override protected def growTable() {
super.growTable()
- sizeTracker.flushSamples()
+ resetSamples()
}
+
+ /** Estimates the current size of the map in bytes. O(1) time. */
+ def estimateSize(): Long = {
+ assert(samples.nonEmpty)
+ val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
+ (samples.last.size + extrapolatedDelta).toLong
+ }
+}
+
+object SizeTrackingAppendOnlyMap {
+ case class Sample(size: Long, numUpdates: Long)
}
diff --git a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
index 47e4723cf3..93f0c6a8e6 100644
--- a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
@@ -21,10 +21,10 @@ import scala.util.Random
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass
+import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
-class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll {
+class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
val NORMAL_ERROR = 0.20
val HIGH_ERROR = 0.30
@@ -70,24 +70,24 @@ class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll {
}
}
-object SamplingSizeTrackerSuite {
+object SizeTrackingAppendOnlyMapSuite {
// Speed test, for reproducibility of results.
// These could be highly non-deterministic in general, however.
// Results:
- // AppendOnlyMap: 30 ms
- // SizeTracker: 45 ms
+ // AppendOnlyMap: 31 ms
+ // SizeTracker: 54 ms
// SizeEstimator: 1500 ms
def main(args: Array[String]) {
val numElements = 100000
- val baseTimes = for (i <- 0 until 3) yield time {
+ val baseTimes = for (i <- 0 until 10) yield time {
val map = new AppendOnlyMap[Int, LargeDummyClass]()
for (i <- 0 until numElements) {
map(i) = new LargeDummyClass()
}
}
- val sampledTimes = for (i <- 0 until 3) yield time {
+ val sampledTimes = for (i <- 0 until 10) yield time {
val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
for (i <- 0 until numElements) {
map(i) = new LargeDummyClass()