From 3de24ae2ed6c58fc96a7e50832afe42fe7af34fb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Mar 2016 10:15:23 -0700 Subject: [SPARK-14075] Refactor MemoryStore to be testable independent of BlockManager This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`. - The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`. - `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`. - The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests. - Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`. Author: Josh Rosen Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling. --- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 3 ++- .../spark/streaming/receiver/ReceivedBlockHandler.scala | 6 ++++-- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 2 +- .../apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 11 ++++++----- .../streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala | 9 ++++++--- 5 files changed, 19 insertions(+), 12 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index ace67a639c..c56520b1e2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -115,6 +115,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( assertValid() val hadoopConf = broadcastedHadoopConf.value val blockManager = SparkEnv.get.blockManager + val serializerManager = SparkEnv.get.serializerManager val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] val blockId = partition.blockId @@ -161,7 +162,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } - blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead)) + serializerManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead)) .asInstanceOf[Iterator[T]] } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 6d4f4b99c1..85350ff658 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._ import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils} @@ -123,6 +124,7 @@ private[streaming] case class WriteAheadLogBasedStoreResult( */ private[streaming] class WriteAheadLogBasedBlockHandler( blockManager: BlockManager, + serializerManager: SerializerManager, streamId: Int, storageLevel: StorageLevel, conf: SparkConf, @@ -173,10 +175,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler( val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) - blockManager.dataSerialize(blockId, arrayBuffer.iterator) + serializerManager.dataSerialize(blockId, arrayBuffer.iterator) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) - val serializedBlock = blockManager.dataSerialize(blockId, countIterator) + val serializedBlock = serializerManager.dataSerialize(blockId, countIterator) numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index e41fd11963..4fb0f8caac 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -60,7 +60,7 @@ private[streaming] class ReceiverSupervisorImpl( "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } - new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, + new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 122ca0627f..4e77cd6347 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -60,6 +60,7 @@ class ReceivedBlockHandlerSuite val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) val serializer = new KryoSerializer(conf) + var serializerManager = new SerializerManager(serializer, conf) val manualClock = new ManualClock val blockManagerSize = 10000000 val blockManagerBuffer = new ArrayBuffer[BlockManager]() @@ -156,7 +157,7 @@ class ReceivedBlockHandlerSuite val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) val bytes = reader.read(fileSegment) reader.close() - blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList + serializerManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList } loggedData shouldEqual data } @@ -265,7 +266,6 @@ class ReceivedBlockHandlerSuite name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val serializerManager = new SerializerManager(serializer, conf) val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memManager.setMemoryStore(blockManager.memoryStore) @@ -335,7 +335,8 @@ class ReceivedBlockHandlerSuite } } - def dataToByteBuffer(b: Seq[String]) = blockManager.dataSerialize(generateBlockId, b.iterator) + def dataToByteBuffer(b: Seq[String]) = + serializerManager.dataSerialize(generateBlockId, b.iterator) val blocks = data.grouped(10).toSeq @@ -367,8 +368,8 @@ class ReceivedBlockHandlerSuite /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */ private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) { require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1) - val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1, - storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) + val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, serializerManager, + 1, storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) try { body(receivedBlockHandler) } finally { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index c4bf42d0f2..ce5a6e00fb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter} import org.apache.spark.util.Utils @@ -39,6 +40,7 @@ class WriteAheadLogBackedBlockRDDSuite var sparkContext: SparkContext = null var blockManager: BlockManager = null + var serializerManager: SerializerManager = null var dir: File = null override def beforeEach(): Unit = { @@ -58,6 +60,7 @@ class WriteAheadLogBackedBlockRDDSuite super.beforeAll() sparkContext = new SparkContext(conf) blockManager = sparkContext.env.blockManager + serializerManager = sparkContext.env.serializerManager } override def afterAll(): Unit = { @@ -65,6 +68,8 @@ class WriteAheadLogBackedBlockRDDSuite try { sparkContext.stop() System.clearProperty("spark.driver.port") + blockManager = null + serializerManager = null } finally { super.afterAll() } @@ -107,8 +112,6 @@ class WriteAheadLogBackedBlockRDDSuite * It can also test if the partitions that were read from the log were again stored in * block manager. * - * - * * @param numPartitions Number of partitions in RDD * @param numPartitionsInBM Number of partitions to write to the BlockManager. * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager @@ -223,7 +226,7 @@ class WriteAheadLogBackedBlockRDDSuite require(blockData.size === blockIds.size) val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => - writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer) + writer.write(serializerManager.dataSerialize(id, data.iterator).toByteBuffer) } writer.close() segments -- cgit v1.2.3