aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-12-01 19:36:34 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-12-01 19:36:34 -0800
commit1ce4adf55b535518c2e63917a827fac1f2df4e8e (patch)
treee4ca6006d6a4f2e30f6d320e03bd3c0f7f7688ce /core
parente96a70d5ab2e2b43a2df17a550fa9ed2ee0001c4 (diff)
downloadspark-1ce4adf55b535518c2e63917a827fac1f2df4e8e.tar.gz
spark-1ce4adf55b535518c2e63917a827fac1f2df4e8e.tar.bz2
spark-1ce4adf55b535518c2e63917a827fac1f2df4e8e.zip
[SPARK-8414] Ensure context cleaner periodic cleanups
Garbage collection triggers cleanups. If the driver JVM is huge and there is little memory pressure, we may never clean up shuffle files on executors. This is a problem for long-running applications (e.g. streaming). Author: Andrew Or <andrew@databricks.com> Closes #10070 from andrewor14/periodic-gc.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala21
1 files changed, 20 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index d23c1533db..bc732535fe 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -18,12 +18,13 @@
package org.apache.spark
import java.lang.ref.{ReferenceQueue, WeakReference}
+import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* Classes that represent cleaning tasks.
@@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
+ private val periodicGCService: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
+
+ /**
+ * How often to trigger a garbage collection in this JVM.
+ *
+ * This context cleaner triggers cleanups only when weak references are garbage collected.
+ * In long-running applications with large driver JVMs, where there is little memory pressure
+ * on the driver, this may happen very occasionally or not at all. Not cleaning at all may
+ * lead to executors running out of disk space after a while.
+ */
+ private val periodicGCInterval =
+ sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
+
/**
* Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
@@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
+ periodicGCService.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit = System.gc()
+ }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
/**
@@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.interrupt()
}
cleaningThread.join()
+ periodicGCService.shutdown()
}
/** Register a RDD for cleanup when it is garbage collected. */