aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-23 10:15:23 -0700
commit3de24ae2ed6c58fc96a7e50832afe42fe7af34fb (patch)
tree0eb9f5d7100301195e6d0c1b77114e2398f6edb3 /streaming
parent6ce008ba46aa1fc8a5c222ce0f25a6d81f53588e (diff)
downloadspark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.gz
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.tar.bz2
spark-3de24ae2ed6c58fc96a7e50832afe42fe7af34fb.zip
[SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`. - The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`. - `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`. - The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests. - Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala9
5 files changed, 19 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ace67a639c..c56520b1e2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -115,6 +115,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
assertValid()
val hadoopConf = broadcastedHadoopConf.value
val blockManager = SparkEnv.get.blockManager
+ val serializerManager = SparkEnv.get.serializerManager
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockId = partition.blockId
@@ -161,7 +162,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
- blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
+ serializerManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
.asInstanceOf[Iterator[T]]
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 6d4f4b99c1..85350ff658 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.SerializerManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
@@ -123,6 +124,7 @@ private[streaming] case class WriteAheadLogBasedStoreResult(
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
blockManager: BlockManager,
+ serializerManager: SerializerManager,
streamId: Int,
storageLevel: StorageLevel,
conf: SparkConf,
@@ -173,10 +175,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
- blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+ serializerManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
- val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
+ val serializedBlock = serializerManager.dataSerialize(blockId, countIterator)
numRecords = countIterator.count
serializedBlock
case ByteBufferBlock(byteBuffer) =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index e41fd11963..4fb0f8caac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -60,7 +60,7 @@ private[streaming] class ReceiverSupervisorImpl(
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
- new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
+ new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 122ca0627f..4e77cd6347 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -60,6 +60,7 @@ class ReceivedBlockHandlerSuite
val mapOutputTracker = new MapOutputTrackerMaster(conf)
val shuffleManager = new HashShuffleManager(conf)
val serializer = new KryoSerializer(conf)
+ var serializerManager = new SerializerManager(serializer, conf)
val manualClock = new ManualClock
val blockManagerSize = 10000000
val blockManagerBuffer = new ArrayBuffer[BlockManager]()
@@ -156,7 +157,7 @@ class ReceivedBlockHandlerSuite
val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
val bytes = reader.read(fileSegment)
reader.close()
- blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList
+ serializerManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList
}
loggedData shouldEqual data
}
@@ -265,7 +266,6 @@ class ReceivedBlockHandlerSuite
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val serializerManager = new SerializerManager(serializer, conf)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
@@ -335,7 +335,8 @@ class ReceivedBlockHandlerSuite
}
}
- def dataToByteBuffer(b: Seq[String]) = blockManager.dataSerialize(generateBlockId, b.iterator)
+ def dataToByteBuffer(b: Seq[String]) =
+ serializerManager.dataSerialize(generateBlockId, b.iterator)
val blocks = data.grouped(10).toSeq
@@ -367,8 +368,8 @@ class ReceivedBlockHandlerSuite
/** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */
private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) {
require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1)
- val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
- storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
+ val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, serializerManager,
+ 1, storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
try {
body(receivedBlockHandler)
} finally {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index c4bf42d0f2..ce5a6e00fb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.serializer.SerializerManager
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter}
import org.apache.spark.util.Utils
@@ -39,6 +40,7 @@ class WriteAheadLogBackedBlockRDDSuite
var sparkContext: SparkContext = null
var blockManager: BlockManager = null
+ var serializerManager: SerializerManager = null
var dir: File = null
override def beforeEach(): Unit = {
@@ -58,6 +60,7 @@ class WriteAheadLogBackedBlockRDDSuite
super.beforeAll()
sparkContext = new SparkContext(conf)
blockManager = sparkContext.env.blockManager
+ serializerManager = sparkContext.env.serializerManager
}
override def afterAll(): Unit = {
@@ -65,6 +68,8 @@ class WriteAheadLogBackedBlockRDDSuite
try {
sparkContext.stop()
System.clearProperty("spark.driver.port")
+ blockManager = null
+ serializerManager = null
} finally {
super.afterAll()
}
@@ -107,8 +112,6 @@ class WriteAheadLogBackedBlockRDDSuite
* It can also test if the partitions that were read from the log were again stored in
* block manager.
*
- *
- *
* @param numPartitions Number of partitions in RDD
* @param numPartitionsInBM Number of partitions to write to the BlockManager.
* Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
@@ -223,7 +226,7 @@ class WriteAheadLogBackedBlockRDDSuite
require(blockData.size === blockIds.size)
val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
val segments = blockData.zip(blockIds).map { case (data, id) =>
- writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer)
+ writer.write(serializerManager.dataSerialize(id, data.iterator).toByteBuffer)
}
writer.close()
segments