diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-09 15:46:15 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-09 15:46:15 -0800 |
commit | 9915989bfa242a6f82a7b847ad25e434067da5cf (patch) | |
tree | 8832dee091d49de3ebed7d351e5cca1094cd5093 /core | |
parent | de00bc63dbc8db334f28fcb428e578919a9df7a1 (diff) | |
download | spark-9915989bfa242a6f82a7b847ad25e434067da5cf.tar.gz spark-9915989bfa242a6f82a7b847ad25e434067da5cf.tar.bz2 spark-9915989bfa242a6f82a7b847ad25e434067da5cf.zip |
Incorporated Matei's suggestions. Tested with 5 producer(consumer) threads each doing 50k puts (gets), took 15 minutes to run, no errors or deadlocks.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/storage/MemoryStore.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/ThreadingTest.scala | 20 |
2 files changed, 18 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 241200c07f..02098b82fe 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -214,7 +214,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // This should never be null as only one thread should be dropping // blocks and removing entries. However the check is still here for // future safety. - if (entries != null) { + if (entry != null) { val data = if (entry.deserialized) { Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) } else { diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala index 13e2f20e64..e4a5b8ffdf 100644 --- a/core/src/main/scala/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -16,7 +16,7 @@ import util.Random private[spark] object ThreadingTest { val numProducers = 5 - val numBlocksPerProducer = 10000 + val numBlocksPerProducer = 20000 private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { val queue = new ArrayBlockingQueue[(String, Seq[Int])](100) @@ -26,7 +26,7 @@ private[spark] object ThreadingTest { val blockId = "b-" + id + "-" + i val blockSize = Random.nextInt(1000) val block = (1 to blockSize).map(_ => Random.nextInt()) - val level = if (Random.nextBoolean()) StorageLevel.MEMORY_ONLY_SER else StorageLevel.MEMORY_AND_DISK + val level = randomLevel() val startTime = System.currentTimeMillis() manager.put(blockId, block.iterator, level, true) println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") @@ -34,9 +34,21 @@ private[spark] object ThreadingTest { } println("Producer thread " + id + " terminated") } + + def randomLevel(): StorageLevel = { + math.abs(Random.nextInt()) % 4 match { + case 0 => StorageLevel.MEMORY_ONLY + case 1 => StorageLevel.MEMORY_ONLY_SER + case 2 => StorageLevel.MEMORY_AND_DISK + case 3 => StorageLevel.MEMORY_AND_DISK_SER + } + } } - private[spark] class ConsumerThread(manager: BlockManager, queue: ArrayBlockingQueue[(String, Seq[Int])]) extends Thread { + private[spark] class ConsumerThread( + manager: BlockManager, + queue: ArrayBlockingQueue[(String, Seq[Int])] + ) extends Thread { var numBlockConsumed = 0 override def run() { @@ -73,5 +85,7 @@ private[spark] object ThreadingTest { blockManagerMaster.stop() actorSystem.shutdown() actorSystem.awaitTermination() + println("Everything stopped.") + println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") } } |