aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala20
1 files changed, 20 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index a3b4561b07..e80feeeab4 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,11 +17,15 @@
package org.apache.spark.executor
+import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
@@ -210,10 +214,26 @@ class TaskMetrics extends Serializable {
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
+
+ @throws(classOf[IOException])
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ in.defaultReadObject()
+ // Get the hostname from cached data, since hostname is the order of number of nodes in
+ // cluster, so using cached hostname will decrease the object number and alleviate the GC
+ // overhead.
+ _hostname = TaskMetrics.getCachedHostName(_hostname)
+ }
}
private[spark] object TaskMetrics {
+ private val hostNameCache = new ConcurrentHashMap[String, String]()
+
def empty: TaskMetrics = new TaskMetrics
+
+ def getCachedHostName(host: String): String = {
+ val canonicalHost = hostNameCache.putIfAbsent(host, host)
+ if (canonicalHost != null) canonicalHost else host
+ }
}
/**