aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-07-14 19:54:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-14 19:54:02 -0700
commitbb870e72f42b6ce8d056df259f6fcf41808d7ed2 (patch)
tree1f91b06b141770b4f514ea1782243548ed31f50b
parentf957796c4b3c3cd95edfc64500a045f7e810ee87 (diff)
downloadspark-bb870e72f42b6ce8d056df259f6fcf41808d7ed2.tar.gz
spark-bb870e72f42b6ce8d056df259f6fcf41808d7ed2.tar.bz2
spark-bb870e72f42b6ce8d056df259f6fcf41808d7ed2.zip
[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to decrease the memory usage and GC overhead
Hostname in TaskMetrics will be created through deserialization, mostly the number of hostname is only the order of number of cluster node, so adding a cache layer to dedup the object could reduce the memory usage and alleviate GC overhead, especially for long-running and fast job generation applications like Spark Streaming. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #5064 from jerryshao/SPARK-5523 and squashes the following commits: 3e2412a [jerryshao] Address the comments b092a81 [Saisai Shao] Add a pool to cache the hostname
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala20
1 files changed, 20 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index a3b4561b07..e80feeeab4 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,11 +17,15 @@
package org.apache.spark.executor
+import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
@@ -210,10 +214,26 @@ class TaskMetrics extends Serializable {
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
+
+ @throws(classOf[IOException])
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ in.defaultReadObject()
+ // Get the hostname from cached data, since hostname is the order of number of nodes in
+ // cluster, so using cached hostname will decrease the object number and alleviate the GC
+ // overhead.
+ _hostname = TaskMetrics.getCachedHostName(_hostname)
+ }
}
private[spark] object TaskMetrics {
+ private val hostNameCache = new ConcurrentHashMap[String, String]()
+
def empty: TaskMetrics = new TaskMetrics
+
+ def getCachedHostName(host: String): String = {
+ val canonicalHost = hostNameCache.putIfAbsent(host, host)
+ if (canonicalHost != null) canonicalHost else host
+ }
}
/**