aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java3
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala10
4 files changed, 55 insertions, 6 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 5f743b2885..d31eb449eb 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -215,6 +215,9 @@ public class TaskMemoryManager {
logger.info(
"{} bytes of memory were used by task {} but are not associated with specific consumers",
memoryNotAccountedFor, taskAttemptId);
+ logger.info(
+ "{} bytes of memory are used for execution and {} bytes of memory are used for storage",
+ memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed());
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 23ee4eff08..c3a2675ee5 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -32,6 +32,16 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.OpenHashSet
/**
+ * A trait that allows a class to give [[SizeEstimator]] more accurate size estimation.
+ * When a class extends it, [[SizeEstimator]] will query the `estimatedSize` first.
+ * If `estimatedSize` does not return [[None]], [[SizeEstimator]] will use the returned size
+ * as the size of the object. Otherwise, [[SizeEstimator]] will do the estimation work.
+ */
+private[spark] trait SizeEstimation {
+ def estimatedSize: Option[Long]
+}
+
+/**
* :: DeveloperApi ::
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
* memory-aware caches.
@@ -199,10 +209,18 @@ object SizeEstimator extends Logging {
// the size estimator since it references the whole REPL. Do nothing in this case. In
// general all ClassLoaders and Classes will be shared between objects anyway.
} else {
- val classInfo = getClassInfo(cls)
- state.size += alignSize(classInfo.shellSize)
- for (field <- classInfo.pointerFields) {
- state.enqueue(field.get(obj))
+ val estimatedSize = obj match {
+ case s: SizeEstimation => s.estimatedSize
+ case _ => None
+ }
+ if (estimatedSize.isDefined) {
+ state.size += estimatedSize.get
+ } else {
+ val classInfo = getClassInfo(cls)
+ state.size += alignSize(classInfo.shellSize)
+ for (field <- classInfo.pointerFields) {
+ state.enqueue(field.get(obj))
+ }
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 20550178fb..9b6261af12 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -60,6 +60,18 @@ class DummyString(val arr: Array[Char]) {
@transient val hash32: Int = 0
}
+class DummyClass8 extends SizeEstimation {
+ val x: Int = 0
+
+ override def estimatedSize: Option[Long] = Some(2015)
+}
+
+class DummyClass9 extends SizeEstimation {
+ val x: Int = 0
+
+ override def estimatedSize: Option[Long] = None
+}
+
class SizeEstimatorSuite
extends SparkFunSuite
with BeforeAndAfterEach
@@ -214,4 +226,14 @@ class SizeEstimatorSuite
// Class should be 32 bytes on s390x if recognised as 64 bit platform
assertResult(32)(SizeEstimator.estimate(new DummyClass7))
}
+
+ test("SizeEstimation can provide the estimated size") {
+ // DummyClass8 provides its size estimation.
+ assertResult(2015)(SizeEstimator.estimate(new DummyClass8))
+ assertResult(20206)(SizeEstimator.estimate(Array.fill(10)(new DummyClass8)))
+
+ // DummyClass9 does not provide its size estimation.
+ assertResult(16)(SizeEstimator.estimate(new DummyClass9))
+ assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass9)))
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index cc8abb1ba4..49ae09bf53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.MemoryLocation
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SizeEstimation, Utils}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.{SparkConf, SparkEnv}
@@ -189,7 +189,9 @@ private[execution] object HashedRelation {
*/
private[joins] final class UnsafeHashedRelation(
private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]])
- extends HashedRelation with Externalizable {
+ extends HashedRelation
+ with SizeEstimation
+ with Externalizable {
private[joins] def this() = this(null) // Needed for serialization
@@ -215,6 +217,10 @@ private[joins] final class UnsafeHashedRelation(
}
}
+ override def estimatedSize: Option[Long] = {
+ Option(binaryMap).map(_.getTotalMemoryConsumption)
+ }
+
override def get(key: InternalRow): Seq[InternalRow] = {
val unsafeKey = key.asInstanceOf[UnsafeRow]