aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala21
1 files changed, 20 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index d23c1533db..bc732535fe 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -18,12 +18,13 @@
package org.apache.spark
import java.lang.ref.{ReferenceQueue, WeakReference}
+import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* Classes that represent cleaning tasks.
@@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
+ private val periodicGCService: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
+
+ /**
+ * How often to trigger a garbage collection in this JVM.
+ *
+ * This context cleaner triggers cleanups only when weak references are garbage collected.
+ * In long-running applications with large driver JVMs, where there is little memory pressure
+ * on the driver, this may happen very occasionally or not at all. Not cleaning at all may
+ * lead to executors running out of disk space after a while.
+ */
+ private val periodicGCInterval =
+ sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
+
/**
* Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
@@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
+ periodicGCService.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit = System.gc()
+ }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
/**
@@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.interrupt()
}
cleaningThread.join()
+ periodicGCService.shutdown()
}
/** Register a RDD for cleanup when it is garbage collected. */