aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorHong Shen <hongshen@tencent.com>2015-04-27 18:57:31 -0400
committerSean Owen <sowen@cloudera.com>2015-04-27 18:59:45 -0400
commit8e1c00dbf4b60962908626dead744e5d73c8085e (patch)
tree2e31a37364741dd0e9f05258bfdca6a46a62bfc1 /core/src
parentb9de9e040aff371c6acf9b3f3d1ff8b360c0cd56 (diff)
downloadspark-8e1c00dbf4b60962908626dead744e5d73c8085e.tar.gz
spark-8e1c00dbf4b60962908626dead744e5d73c8085e.tar.bz2
spark-8e1c00dbf4b60962908626dead744e5d73c8085e.zip
[SPARK-6738] [CORE] Improve estimate the size of a large array
Currently, SizeEstimator.visitArray is not correct in the follow case, ``` array size > 200, elem has the share object ``` when I add a debug log in SizeTracker.scala: ``` System.err.println(s"numUpdates:$numUpdates, size:$ts, bytesPerUpdate:$bytesPerUpdate, cost time:$b") ``` I get the following log: ``` numUpdates:1, size:262448, bytesPerUpdate:0.0, cost time:35 numUpdates:2, size:420698, bytesPerUpdate:158250.0, cost time:35 numUpdates:4, size:420754, bytesPerUpdate:28.0, cost time:32 numUpdates:7, size:420754, bytesPerUpdate:0.0, cost time:27 numUpdates:12, size:420754, bytesPerUpdate:0.0, cost time:28 numUpdates:20, size:420754, bytesPerUpdate:0.0, cost time:25 numUpdates:32, size:420754, bytesPerUpdate:0.0, cost time:21 numUpdates:52, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:84, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:135, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:216, size:420754, bytesPerUpdate:0.0, cost time:11 numUpdates:346, size:420754, bytesPerUpdate:0.0, cost time:6 numUpdates:554, size:488911, bytesPerUpdate:327.67788461538464, cost time:8 numUpdates:887, size:2312259426, bytesPerUpdate:6942253.798798799, cost time:198 15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 3.0 GB to disk (1 time so far) 15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc ``` But in fact the file size is only 162K: ``` $ ll -h /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc -rw-r----- 1 spark users 162K Apr 21 14:27 /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc ``` In order to test case, I change visitArray to: ``` var size = 0l for (i <- 0 until length) { val obj = JArray.get(array, i) size += SizeEstimator.estimate(obj, state.visited).toLong } state.size += size ``` I get the following log: ``` ... 14895 277016088 566.9046118590662 time:8470 23832 281840544 552.3308270676691 time:8031 38132 289891824 539.8294729775092 time:7897 61012 302803640 563.0265734265735 time:13044 97620 322904416 564.3276223776223 time:13554 15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 314.5 MB to disk (1 time so far) 15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark-local-20150414114020-2fcb/14/temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0 ``` the file size is 85M. ``` $ ll -h /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark- local-20150414114020-2fcb/14/ total 85M -rw-r----- 1 spark users 85M Apr 14 11:46 temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0 ``` The following log is when I use this patch, ``` .... numUpdates:32, size:365484, bytesPerUpdate:0.0, cost time:7 numUpdates:52, size:365484, bytesPerUpdate:0.0, cost time:5 numUpdates:84, size:365484, bytesPerUpdate:0.0, cost time:5 numUpdates:135, size:372208, bytesPerUpdate:131.84313725490196, cost time:86 numUpdates:216, size:379020, bytesPerUpdate:84.09876543209876, cost time:21 numUpdates:346, size:1865208, bytesPerUpdate:11432.215384615385, cost time:23 numUpdates:554, size:2052380, bytesPerUpdate:899.8653846153846, cost time:16 numUpdates:887, size:2142820, bytesPerUpdate:271.59159159159157, cost time:15 .. numUpdates:14895, size:251675500, bytesPerUpdate:438.5263157894737, cost time:13 numUpdates:23832, size:257010268, bytesPerUpdate:596.9305135951662, cost time:14 numUpdates:38132, size:263922396, bytesPerUpdate:483.3655944055944, cost time:15 numUpdates:61012, size:268962596, bytesPerUpdate:220.28846153846155, cost time:24 numUpdates:97620, size:286980644, bytesPerUpdate:492.1888111888112, cost time:22 15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: Thread 53 spilling in-memory map of 328.7 MB to disk (1 time so far) 15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/temp_local_9c109510-af16-4468-8f23-48cad04da88f ``` the file size is 88M. ``` $ ll -h /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/ total 88M -rw-r----- 1 spark users 88M Apr 21 14:45 temp_local_9c109510-af16-4468-8f23-48cad04da88f ``` Author: Hong Shen <hongshen@tencent.com> Closes #5608 from shenh062326/my_change5 and squashes the following commits: 5506bae [Hong Shen] Fix compile error c275dd3 [Hong Shen] Alter code style fe202a2 [Hong Shen] Change the code style and add documentation. a9fca84 [Hong Shen] Add test case for SizeEstimator 4877eee [Hong Shen] Improve estimate the size of a large array a2ea7ac [Hong Shen] Alter code style 4c28e36 [Hong Shen] Improve estimate the size of a large array
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala18
2 files changed, 48 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 26ffbf9350..4dd7ab9e07 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -179,7 +179,7 @@ private[spark] object SizeEstimator extends Logging {
}
// Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
- private val ARRAY_SIZE_FOR_SAMPLING = 200
+ private val ARRAY_SIZE_FOR_SAMPLING = 400
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
private def visitArray(array: AnyRef, arrayClass: Class[_], state: SearchState) {
@@ -204,25 +204,40 @@ private[spark] object SizeEstimator extends Logging {
}
} else {
// Estimate the size of a large array by sampling elements without replacement.
- var size = 0.0
+ // To exclude the shared objects that the array elements may link, sample twice
+ // and use the min one to caculate array size.
val rand = new Random(42)
- val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
- var numElementsDrawn = 0
- while (numElementsDrawn < ARRAY_SAMPLE_SIZE) {
- var index = 0
- do {
- index = rand.nextInt(length)
- } while (drawn.contains(index))
- drawn.add(index)
- val elem = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
- size += SizeEstimator.estimate(elem, state.visited)
- numElementsDrawn += 1
- }
- state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
+ val drawn = new OpenHashSet[Int](2 * ARRAY_SAMPLE_SIZE)
+ val s1 = sampleArray(array, state, rand, drawn, length)
+ val s2 = sampleArray(array, state, rand, drawn, length)
+ val size = math.min(s1, s2)
+ state.size += math.max(s1, s2) +
+ (size * ((length - ARRAY_SAMPLE_SIZE) / (ARRAY_SAMPLE_SIZE))).toLong
}
}
}
+ private def sampleArray(
+ array: AnyRef,
+ state: SearchState,
+ rand: Random,
+ drawn: OpenHashSet[Int],
+ length: Int): Long = {
+ var size = 0L
+ for (i <- 0 until ARRAY_SAMPLE_SIZE) {
+ var index = 0
+ do {
+ index = rand.nextInt(length)
+ } while (drawn.contains(index))
+ drawn.add(index)
+ val obj = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
+ if (obj != null) {
+ size += SizeEstimator.estimate(obj, state.visited).toLong
+ }
+ }
+ size
+ }
+
private def primitiveSize(cls: Class[_]): Long = {
if (cls == classOf[Byte]) {
BYTE_SIZE
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 67a9f75ff2..28915bd533 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, PrivateMethodTester}
class DummyClass1 {}
@@ -96,6 +98,22 @@ class SizeEstimatorSuite
// Past size 100, our samples 100 elements, but we should still get the right size.
assertResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
+
+ val arr = new Array[Char](100000)
+ assertResult(200016)(SizeEstimator.estimate(arr))
+ assertResult(480032)(SizeEstimator.estimate(Array.fill(10000)(new DummyString(arr))))
+
+ val buf = new ArrayBuffer[DummyString]()
+ for (i <- 0 until 5000) {
+ buf.append(new DummyString(new Array[Char](10)))
+ }
+ assertResult(340016)(SizeEstimator.estimate(buf.toArray))
+
+ for (i <- 0 until 5000) {
+ buf.append(new DummyString(arr))
+ }
+ assertResult(683912)(SizeEstimator.estimate(buf.toArray))
+
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
// 10 pointers plus 8-byte object