aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-09 15:46:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-09 15:46:15 -0800
commit9915989bfa242a6f82a7b847ad25e434067da5cf (patch)
tree8832dee091d49de3ebed7d351e5cca1094cd5093
parentde00bc63dbc8db334f28fcb428e578919a9df7a1 (diff)
downloadspark-9915989bfa242a6f82a7b847ad25e434067da5cf.tar.gz
spark-9915989bfa242a6f82a7b847ad25e434067da5cf.tar.bz2
spark-9915989bfa242a6f82a7b847ad25e434067da5cf.zip
Incorporated Matei's suggestions. Tested with 5 producer(consumer) threads each doing 50k puts (gets), took 15 minutes to run, no errors or deadlocks.
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala2
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala20
2 files changed, 18 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 241200c07f..02098b82fe 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -214,7 +214,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// This should never be null as only one thread should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
- if (entries != null) {
+ if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 13e2f20e64..e4a5b8ffdf 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -16,7 +16,7 @@ import util.Random
private[spark] object ThreadingTest {
val numProducers = 5
- val numBlocksPerProducer = 10000
+ val numBlocksPerProducer = 20000
private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
@@ -26,7 +26,7 @@ private[spark] object ThreadingTest {
val blockId = "b-" + id + "-" + i
val blockSize = Random.nextInt(1000)
val block = (1 to blockSize).map(_ => Random.nextInt())
- val level = if (Random.nextBoolean()) StorageLevel.MEMORY_ONLY_SER else StorageLevel.MEMORY_AND_DISK
+ val level = randomLevel()
val startTime = System.currentTimeMillis()
manager.put(blockId, block.iterator, level, true)
println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
@@ -34,9 +34,21 @@ private[spark] object ThreadingTest {
}
println("Producer thread " + id + " terminated")
}
+
+ def randomLevel(): StorageLevel = {
+ math.abs(Random.nextInt()) % 4 match {
+ case 0 => StorageLevel.MEMORY_ONLY
+ case 1 => StorageLevel.MEMORY_ONLY_SER
+ case 2 => StorageLevel.MEMORY_AND_DISK
+ case 3 => StorageLevel.MEMORY_AND_DISK_SER
+ }
+ }
}
- private[spark] class ConsumerThread(manager: BlockManager, queue: ArrayBlockingQueue[(String, Seq[Int])]) extends Thread {
+ private[spark] class ConsumerThread(
+ manager: BlockManager,
+ queue: ArrayBlockingQueue[(String, Seq[Int])]
+ ) extends Thread {
var numBlockConsumed = 0
override def run() {
@@ -73,5 +85,7 @@ private[spark] object ThreadingTest {
blockManagerMaster.stop()
actorSystem.shutdown()
actorSystem.awaitTermination()
+ println("Everything stopped.")
+ println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
}
}