aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala110
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala171
-rw-r--r--core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala86
-rw-r--r--docs/configuration.md11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala14
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala19
12 files changed, 48 insertions, 589 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 8670f705cd..1b59beb8d6 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,7 +18,6 @@
package org.apache.spark
import java.io._
-import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
@@ -267,8 +266,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
/**
- * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map
- * output information, which allows old output information based on a TTL.
+ * MapOutputTracker for the driver.
*/
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {
@@ -291,17 +289,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
// can be read locally, but may lead to more delay in scheduling if those locations are busy.
private val REDUCER_PREF_LOCS_FRACTION = 0.2
- /**
- * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
- * so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
- * Other than these two scenarios, nothing should be dropped from this HashMap.
- */
- protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
- private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()
-
- // For cleaning up TimeStampedHashMaps
- private val metadataCleaner =
- new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
+ // HashMaps for storing mapStatuses and cached serialized statuses in the driver.
+ // Statuses are dropped only by explicit de-registering.
+ protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
+ private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
def registerShuffle(shuffleId: Int, numMaps: Int) {
if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
@@ -462,14 +453,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerEndpoint = null
- metadataCleaner.cancel()
cachedSerializedStatuses.clear()
}
-
- private def cleanup(cleanupTime: Long) {
- mapStatuses.clearOldValues(cleanupTime)
- cachedSerializedStatuses.clearOldValues(cleanupTime)
- }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4a99c0b081..98075cef11 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,6 +21,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
+import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.UUID.randomUUID
@@ -32,6 +33,7 @@ import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
import scala.util.control.NonFatal
+import com.google.common.collect.MapMaker
import org.apache.commons.lang.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -199,7 +201,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
- private var _metadataCleaner: MetadataCleaner = _
private var _jobProgressListener: JobProgressListener = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
@@ -271,8 +272,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] val addedJars = HashMap[String, Long]()
// Keeps track of all persisted RDDs
- private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
- private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner
+ private[spark] val persistentRdds = {
+ val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
+ map.asScala
+ }
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener
def statusTracker: SparkStatusTracker = _statusTracker
@@ -439,8 +442,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_conf.set("spark.repl.class.uri", replUri)
}
- _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
-
_statusTracker = new SparkStatusTracker(this)
_progressBar =
@@ -1674,11 +1675,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
env.metricsSystem.report()
}
}
- if (metadataCleaner != null) {
- Utils.tryLogNonFatalError {
- metadataCleaner.cancel()
- }
- }
Utils.tryLogNonFatalError {
_cleaner.foreach(_.stop())
}
@@ -2085,11 +2081,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
- private[spark] def cleanup(cleanupTime: Long) {
- persistentRdds.clearOldValues(cleanupTime)
- }
-
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 7abcb29672..294e16cde1 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
import scala.collection.JavaConverters._
@@ -27,7 +27,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
+import org.apache.spark.util.Utils
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -63,10 +63,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}
- private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
-
- private val metadataCleaner =
- new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
+ private val shuffleStates = new ConcurrentHashMap[ShuffleId, ShuffleState]
/**
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
@@ -75,9 +72,12 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
new ShuffleWriterGroup {
- shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
- private val shuffleState = shuffleStates(shuffleId)
-
+ private val shuffleState: ShuffleState = {
+ // Note: we do _not_ want to just wrap this java ConcurrentHashMap into a Scala map and use
+ // .getOrElseUpdate() because that's actually NOT atomic.
+ shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
+ shuffleStates.get(shuffleId)
+ }
val openStartTime = System.nanoTime
val serializerInstance = serializer.newInstance()
val writers: Array[DiskBlockObjectWriter] = {
@@ -114,7 +114,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
/** Remove all the blocks / files related to a particular shuffle. */
private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
- shuffleStates.get(shuffleId) match {
+ Option(shuffleStates.get(shuffleId)) match {
case Some(state) =>
for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
@@ -131,11 +131,5 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
}
- private def cleanup(cleanupTime: Long) {
- shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
- }
-
- override def stop() {
- metadataCleaner.cancel()
- }
+ override def stop(): Unit = {}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 8caf9e5535..5c80ac17b8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,7 +19,9 @@ package org.apache.spark.storage
import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
@@ -75,7 +77,7 @@ private[spark] class BlockManager(
val diskBlockManager = new DiskBlockManager(this, conf)
- private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
+ private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo]
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
@@ -147,11 +149,6 @@ private[spark] class BlockManager(
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object
- private val metadataCleaner = new MetadataCleaner(
- MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
- private val broadcastCleaner = new MetadataCleaner(
- MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
-
// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
@@ -232,7 +229,7 @@ private[spark] class BlockManager(
*/
private def reportAllBlocks(): Unit = {
logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
- for ((blockId, info) <- blockInfo) {
+ for ((blockId, info) <- blockInfo.asScala) {
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
logError(s"Failed to report $blockId to master; giving up.")
@@ -313,7 +310,7 @@ private[spark] class BlockManager(
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
- blockInfo.get(blockId).map { info =>
+ blockInfo.asScala.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
// Assume that block is not in external block store
@@ -327,7 +324,7 @@ private[spark] class BlockManager(
* may not know of).
*/
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
- (blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
+ (blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
}
/**
@@ -439,7 +436,7 @@ private[spark] class BlockManager(
}
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
- val info = blockInfo.get(blockId).orNull
+ val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
// Double check to make sure the block is still there. There is a small chance that the
@@ -447,7 +444,7 @@ private[spark] class BlockManager(
// Note that this only checks metadata tracking. If user intentionally deleted the block
// on disk or from off heap storage without using removeBlock, this conditional check will
// still pass but eventually we will get an exception because we can't find the block.
- if (blockInfo.get(blockId).isEmpty) {
+ if (blockInfo.asScala.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
@@ -731,7 +728,7 @@ private[spark] class BlockManager(
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
- val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
+ val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo))
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
@@ -1032,7 +1029,7 @@ private[spark] class BlockManager(
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
logInfo(s"Dropping block $blockId from memory")
- val info = blockInfo.get(blockId).orNull
+ val info = blockInfo.get(blockId)
// If the block has not already been dropped
if (info != null) {
@@ -1043,7 +1040,7 @@ private[spark] class BlockManager(
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
- } else if (blockInfo.get(blockId).isEmpty) {
+ } else if (blockInfo.asScala.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
@@ -1095,7 +1092,7 @@ private[spark] class BlockManager(
def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId")
- val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
+ val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
}
@@ -1105,7 +1102,7 @@ private[spark] class BlockManager(
*/
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
logDebug(s"Removing broadcast $broadcastId")
- val blocksToRemove = blockInfo.keys.collect {
+ val blocksToRemove = blockInfo.asScala.keys.collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
@@ -1117,7 +1114,7 @@ private[spark] class BlockManager(
*/
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
- val info = blockInfo.get(blockId).orNull
+ val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
@@ -1141,36 +1138,6 @@ private[spark] class BlockManager(
}
}
- private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = {
- logInfo(s"Dropping non broadcast blocks older than $cleanupTime")
- dropOldBlocks(cleanupTime, !_.isBroadcast)
- }
-
- private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = {
- logInfo(s"Dropping broadcast blocks older than $cleanupTime")
- dropOldBlocks(cleanupTime, _.isBroadcast)
- }
-
- private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = {
- val iterator = blockInfo.getEntrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next()
- val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
- if (time < cleanupTime && shouldDrop(id)) {
- info.synchronized {
- val level = info.level
- if (level.useMemory) { memoryStore.remove(id) }
- if (level.useDisk) { diskStore.remove(id) }
- if (level.useOffHeap) { externalBlockStore.remove(id) }
- iterator.remove()
- logInfo(s"Dropped block $id")
- }
- val status = getCurrentBlockStatus(id, info)
- reportBlockStatus(id, info, status)
- }
- }
- }
-
private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
@@ -1248,8 +1215,6 @@ private[spark] class BlockManager(
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
}
- metadataCleaner.cancel()
- broadcastCleaner.cancel()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
deleted file mode 100644
index a8bbad0868..0000000000
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.{Timer, TimerTask}
-
-import org.apache.spark.{Logging, SparkConf}
-
-/**
- * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
- */
-private[spark] class MetadataCleaner(
- cleanerType: MetadataCleanerType.MetadataCleanerType,
- cleanupFunc: (Long) => Unit,
- conf: SparkConf)
- extends Logging
-{
- val name = cleanerType.toString
-
- private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
- private val periodSeconds = math.max(10, delaySeconds / 10)
- private val timer = new Timer(name + " cleanup timer", true)
-
-
- private val task = new TimerTask {
- override def run() {
- try {
- cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
- logInfo("Ran metadata cleaner for " + name)
- } catch {
- case e: Exception => logError("Error running cleanup task for " + name, e)
- }
- }
- }
-
- if (delaySeconds > 0) {
- logDebug(
- "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
- "and period of " + periodSeconds + " secs")
- timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
- }
-
- def cancel() {
- timer.cancel()
- }
-}
-
-private[spark] object MetadataCleanerType extends Enumeration {
-
- val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, BLOCK_MANAGER,
- SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
-
- type MetadataCleanerType = Value
-
- def systemProperty(which: MetadataCleanerType.MetadataCleanerType): String = {
- "spark.cleaner.ttl." + which.toString
- }
-}
-
-// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
-// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
-private[spark] object MetadataCleaner {
- def getDelaySeconds(conf: SparkConf): Int = {
- conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt
- }
-
- def getDelaySeconds(
- conf: SparkConf,
- cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
- conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString).toInt
- }
-
- def setDelaySeconds(
- conf: SparkConf,
- cleanerType: MetadataCleanerType.MetadataCleanerType,
- delay: Int) {
- conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
- }
-
- /**
- * Set the default delay time (in seconds).
- * @param conf SparkConf instance
- * @param delay default delay time to set
- * @param resetAll whether to reset all to default
- */
- def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) {
- conf.set("spark.cleaner.ttl", delay.toString)
- if (resetAll) {
- for (cleanerType <- MetadataCleanerType.values) {
- System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
- }
- }
- }
-}
-
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
deleted file mode 100644
index 65efeb1f4c..0000000000
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.Set
-
-private[spark] class TimeStampedHashSet[A] extends Set[A] {
- val internalMap = new ConcurrentHashMap[A, Long]()
-
- def contains(key: A): Boolean = {
- internalMap.contains(key)
- }
-
- def iterator: Iterator[A] = {
- val jIterator = internalMap.entrySet().iterator()
- jIterator.asScala.map(_.getKey)
- }
-
- override def + (elem: A): Set[A] = {
- val newSet = new TimeStampedHashSet[A]
- newSet ++= this
- newSet += elem
- newSet
- }
-
- override def - (elem: A): Set[A] = {
- val newSet = new TimeStampedHashSet[A]
- newSet ++= this
- newSet -= elem
- newSet
- }
-
- override def += (key: A): this.type = {
- internalMap.put(key, currentTime)
- this
- }
-
- override def -= (key: A): this.type = {
- internalMap.remove(key)
- this
- }
-
- override def empty: Set[A] = new TimeStampedHashSet[A]()
-
- override def size(): Int = internalMap.size()
-
- override def foreach[U](f: (A) => U): Unit = {
- val iterator = internalMap.entrySet().iterator()
- while(iterator.hasNext) {
- f(iterator.next.getKey)
- }
- }
-
- /**
- * Removes old values that have timestamp earlier than `threshTime`
- */
- def clearOldValues(threshTime: Long) {
- val iterator = internalMap.entrySet().iterator()
- while(iterator.hasNext) {
- val entry = iterator.next()
- if (entry.getValue < threshTime) {
- iterator.remove()
- }
- }
- }
-
- private def currentTime: Long = System.currentTimeMillis()
-}
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala
deleted file mode 100644
index 310c0c1094..0000000000
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.lang.ref.WeakReference
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.language.implicitConversions
-
-import org.apache.spark.Logging
-
-/**
- * A wrapper of TimeStampedHashMap that ensures the values are weakly referenced and timestamped.
- *
- * If the value is garbage collected and the weak reference is null, get() will return a
- * non-existent value. These entries are removed from the map periodically (every N inserts), as
- * their values are no longer strongly reachable. Further, key-value pairs whose timestamps are
- * older than a particular threshold can be removed using the clearOldValues method.
- *
- * TimeStampedWeakValueHashMap exposes a scala.collection.mutable.Map interface, which allows it
- * to be a drop-in replacement for Scala HashMaps. Internally, it uses a Java ConcurrentHashMap,
- * so all operations on this HashMap are thread-safe.
- *
- * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed.
- */
-private[spark] class TimeStampedWeakValueHashMap[A, B](updateTimeStampOnGet: Boolean = false)
- extends mutable.Map[A, B]() with Logging {
-
- import TimeStampedWeakValueHashMap._
-
- private val internalMap = new TimeStampedHashMap[A, WeakReference[B]](updateTimeStampOnGet)
- private val insertCount = new AtomicInteger(0)
-
- /** Return a map consisting only of entries whose values are still strongly reachable. */
- private def nonNullReferenceMap = internalMap.filter { case (_, ref) => ref.get != null }
-
- def get(key: A): Option[B] = internalMap.get(key)
-
- def iterator: Iterator[(A, B)] = nonNullReferenceMap.iterator
-
- override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = {
- val newMap = new TimeStampedWeakValueHashMap[A, B1]
- val oldMap = nonNullReferenceMap.asInstanceOf[mutable.Map[A, WeakReference[B1]]]
- newMap.internalMap.putAll(oldMap.toMap)
- newMap.internalMap += kv
- newMap
- }
-
- override def - (key: A): mutable.Map[A, B] = {
- val newMap = new TimeStampedWeakValueHashMap[A, B]
- newMap.internalMap.putAll(nonNullReferenceMap.toMap)
- newMap.internalMap -= key
- newMap
- }
-
- override def += (kv: (A, B)): this.type = {
- internalMap += kv
- if (insertCount.incrementAndGet() % CLEAR_NULL_VALUES_INTERVAL == 0) {
- clearNullValues()
- }
- this
- }
-
- override def -= (key: A): this.type = {
- internalMap -= key
- this
- }
-
- override def update(key: A, value: B): Unit = this += ((key, value))
-
- override def apply(key: A): B = internalMap.apply(key)
-
- override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = nonNullReferenceMap.filter(p)
-
- override def empty: mutable.Map[A, B] = new TimeStampedWeakValueHashMap[A, B]()
-
- override def size: Int = internalMap.size
-
- override def foreach[U](f: ((A, B)) => U): Unit = nonNullReferenceMap.foreach(f)
-
- def putIfAbsent(key: A, value: B): Option[B] = internalMap.putIfAbsent(key, value)
-
- def toMap: Map[A, B] = iterator.toMap
-
- /** Remove old key-value pairs with timestamps earlier than `threshTime`. */
- def clearOldValues(threshTime: Long): Unit = internalMap.clearOldValues(threshTime)
-
- /** Remove entries with values that are no longer strongly reachable. */
- def clearNullValues() {
- val it = internalMap.getEntrySet.iterator
- while (it.hasNext) {
- val entry = it.next()
- if (entry.getValue.value.get == null) {
- logDebug("Removing key " + entry.getKey + " because it is no longer strongly reachable.")
- it.remove()
- }
- }
- }
-
- // For testing
-
- def getTimestamp(key: A): Option[Long] = {
- internalMap.getTimeStampedValue(key).map(_.timestamp)
- }
-
- def getReference(key: A): Option[WeakReference[B]] = {
- internalMap.getTimeStampedValue(key).map(_.value)
- }
-}
-
-/**
- * Helper methods for converting to and from WeakReferences.
- */
-private object TimeStampedWeakValueHashMap {
-
- // Number of inserts after which entries with null references are removed
- val CLEAR_NULL_VALUES_INTERVAL = 100
-
- /* Implicit conversion methods to WeakReferences. */
-
- implicit def toWeakReference[V](v: V): WeakReference[V] = new WeakReference[V](v)
-
- implicit def toWeakReferenceTuple[K, V](kv: (K, V)): (K, WeakReference[V]) = {
- kv match { case (k, v) => (k, toWeakReference(v)) }
- }
-
- implicit def toWeakReferenceFunction[K, V, R](p: ((K, V)) => R): ((K, WeakReference[V])) => R = {
- (kv: (K, WeakReference[V])) => p(kv)
- }
-
- /* Implicit conversion methods from WeakReferences. */
-
- implicit def fromWeakReference[V](ref: WeakReference[V]): V = ref.get
-
- implicit def fromWeakReferenceOption[V](v: Option[WeakReference[V]]): Option[V] = {
- v match {
- case Some(ref) => Option(fromWeakReference(ref))
- case None => None
- }
- }
-
- implicit def fromWeakReferenceTuple[K, V](kv: (K, WeakReference[V])): (K, V) = {
- kv match { case (k, v) => (k, fromWeakReference(v)) }
- }
-
- implicit def fromWeakReferenceIterator[K, V](
- it: Iterator[(K, WeakReference[V])]): Iterator[(K, V)] = {
- it.map(fromWeakReferenceTuple)
- }
-
- implicit def fromWeakReferenceMap[K, V](
- map: mutable.Map[K, WeakReference[V]]) : mutable.Map[K, V] = {
- mutable.Map(map.mapValues(fromWeakReference).toSeq: _*)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
index 9b3169026c..25fc15dd54 100644
--- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.util
-import java.lang.ref.WeakReference
-
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
@@ -34,10 +32,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite {
testMap(new TimeStampedHashMap[String, String]())
testMapThreadSafety(new TimeStampedHashMap[String, String]())
- // Test TimeStampedWeakValueHashMap basic functionality
- testMap(new TimeStampedWeakValueHashMap[String, String]())
- testMapThreadSafety(new TimeStampedWeakValueHashMap[String, String]())
-
test("TimeStampedHashMap - clearing by timestamp") {
// clearing by insertion time
val map = new TimeStampedHashMap[String, String](updateTimeStampOnGet = false)
@@ -68,86 +62,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite {
assert(map1.get("k2").isDefined)
}
- test("TimeStampedWeakValueHashMap - clearing by timestamp") {
- // clearing by insertion time
- val map = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = false)
- map("k1") = "v1"
- assert(map("k1") === "v1")
- Thread.sleep(10)
- val threshTime = System.currentTimeMillis
- assert(map.getTimestamp("k1").isDefined)
- assert(map.getTimestamp("k1").get < threshTime)
- map.clearOldValues(threshTime)
- assert(map.get("k1") === None)
-
- // clearing by modification time
- val map1 = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = true)
- map1("k1") = "v1"
- map1("k2") = "v2"
- assert(map1("k1") === "v1")
- Thread.sleep(10)
- val threshTime1 = System.currentTimeMillis
- Thread.sleep(10)
- assert(map1("k2") === "v2") // access k2 to update its access time to > threshTime
- assert(map1.getTimestamp("k1").isDefined)
- assert(map1.getTimestamp("k1").get < threshTime1)
- assert(map1.getTimestamp("k2").isDefined)
- assert(map1.getTimestamp("k2").get >= threshTime1)
- map1.clearOldValues(threshTime1) // should only clear k1
- assert(map1.get("k1") === None)
- assert(map1.get("k2").isDefined)
- }
-
- test("TimeStampedWeakValueHashMap - clearing weak references") {
- var strongRef = new Object
- val weakRef = new WeakReference(strongRef)
- val map = new TimeStampedWeakValueHashMap[String, Object]
- map("k1") = strongRef
- map("k2") = "v2"
- map("k3") = "v3"
- val isEquals = map("k1") == strongRef
- assert(isEquals)
-
- // clear strong reference to "k1"
- strongRef = null
- val startTime = System.currentTimeMillis
- System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
- System.runFinalization() // Make a best effort to call finalizer on all cleaned objects.
- while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
- System.gc()
- System.runFinalization()
- Thread.sleep(100)
- }
- assert(map.getReference("k1").isDefined)
- val ref = map.getReference("k1").get
- assert(ref.get === null)
- assert(map.get("k1") === None)
-
- // operations should only display non-null entries
- assert(map.iterator.forall { case (k, v) => k != "k1" })
- assert(map.filter { case (k, v) => k != "k2" }.size === 1)
- assert(map.filter { case (k, v) => k != "k2" }.head._1 === "k3")
- assert(map.toMap.size === 2)
- assert(map.toMap.forall { case (k, v) => k != "k1" })
- val buffer = new ArrayBuffer[String]
- map.foreach { case (k, v) => buffer += v.toString }
- assert(buffer.size === 2)
- assert(buffer.forall(_ != "k1"))
- val plusMap = map + (("k4", "v4"))
- assert(plusMap.size === 3)
- assert(plusMap.forall { case (k, v) => k != "k1" })
- val minusMap = map - "k2"
- assert(minusMap.size === 1)
- assert(minusMap.head._1 == "k3")
-
- // clear null values - should only clear k1
- map.clearNullValues()
- assert(map.getReference("k1") === None)
- assert(map.get("k1") === None)
- assert(map.get("k2").isDefined)
- assert(map.get("k2").get === "v2")
- }
-
/** Test basic operations of a Scala mutable Map. */
def testMap(hashMapConstructor: => mutable.Map[String, String]) {
def newMap() = hashMapConstructor
diff --git a/docs/configuration.md b/docs/configuration.md
index 7d743d572b..3ffc77dcc6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -824,17 +824,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.cleaner.ttl</code></td>
- <td>(infinite)</td>
- <td>
- Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks
- generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be
- forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in
- case of Spark Streaming applications). Note that any RDD that persists in memory for more than
- this duration will be cleared as well.
- </td>
-</tr>
-<tr>
<td><code>spark.executor.cores</code></td>
<td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
<td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 61b230ab6f..b186d29761 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
import org.apache.spark.streaming.scheduler.JobGenerator
-import org.apache.spark.util.{MetadataCleaner, Utils}
private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
@@ -40,7 +40,6 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
- val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll
def createSparkConf(): SparkConf = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 91a43e14a8..c59348a89d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.ui.UIUtils
-import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
+import org.apache.spark.util.{CallSite, Utils}
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -271,18 +271,6 @@ abstract class DStream[T: ClassTag] (
checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
)
- val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
- logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
- require(
- metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
- "It seems you are doing some DStream window operation or setting a checkpoint interval " +
- "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
- "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
- "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
- "set the Java cleaner delay to more than " +
- math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
- )
-
dependencies.foreach(_.validateAtStart())
logInfo("Slide time = " + slideDuration)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 860fac29c0..0ae4c45988 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -81,9 +81,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
test("from conf with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10s")
+ myConf.set("spark.dummyTimeConfig", "10s")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
}
test("from existing SparkContext") {
@@ -93,26 +93,27 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
test("from existing SparkContext with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10s")
+ myConf.set("spark.dummyTimeConfig", "10s")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
}
test("from checkpoint") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10s")
+ myConf.set("spark.dummyTimeConfig", "10s")
val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register()
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
assert(
Utils.timeStringAsSeconds(cp.sparkConfPairs
- .toMap.getOrElse("spark.cleaner.ttl", "-1")) === 10)
+ .toMap.getOrElse("spark.dummyTimeConfig", "-1")) === 10)
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+ assert(
+ newCp.createSparkConf().getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
ssc = new StreamingContext(null, newCp, null)
- assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
}
test("checkPoint from conf") {
@@ -288,7 +289,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("spark.cleaner.ttl", "3600s")
+ conf.set("spark.dummyTimeConfig", "3600s")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
logInfo("==================================\n\n\n")