aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SizeEstimator.scala10
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala8
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala4
-rw-r--r--core/src/test/scala/spark/FileSuite.scala16
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala8
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuite.scala23
8 files changed, 64 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala
index aadd475868..71b9c1f62a 100644
--- a/core/src/main/scala/spark/SizeEstimator.scala
+++ b/core/src/main/scala/spark/SizeEstimator.scala
@@ -77,10 +77,10 @@ object SizeEstimator extends Logging {
return System.getProperty("spark.test.useCompressedOops").toBoolean
}
try {
- val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic";
- val server = ManagementFactory.getPlatformMBeanServer();
+ val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
+ val server = ManagementFactory.getPlatformMBeanServer()
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
- hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]);
+ hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean])
return bean.getVMOption("UseCompressedOops").getValue.toBoolean
} catch {
case e: Exception => {
@@ -142,6 +142,10 @@ object SizeEstimator extends Logging {
val cls = obj.getClass
if (cls.isArray) {
visitArray(obj, cls, state)
+ } else if (classOf[ClassLoader].isAssignableFrom(cls) || classOf[Class].isAssignableFrom(cls)) {
+ // Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses
+ // the size estimator since it references the whole REPL. Do nothing in this case. In
+ // general all ClassLoaders and Classes will be shared between objects anyway.
} else {
val classInfo = getClassInfo(cls)
state.size += classInfo.shellSize
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 0bb9937992..0b9647d168 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -18,7 +18,7 @@ extends Broadcast[T] with Logging with Serializable {
MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false)
}
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@@ -53,7 +53,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must call this after all the variables have been created/initialized
if (!isLocal) {
- sendBroadcast
+ sendBroadcast()
}
def sendBroadcast() {
@@ -119,7 +119,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("Started reading broadcast variable " + uuid)
// Initializing everything because Master will only send null/0 values
// Only the 1st worker in a node can be here. Others will get from cache
- initializeWorkerVariables
+ initializeWorkerVariables()
logInfo("Local host address: " + hostAddress)
@@ -135,7 +135,7 @@ extends Broadcast[T] with Logging with Serializable {
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false)
} else {
logError("Reading Broadcasted variable " + uuid + " failed")
}
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 64037fb2d5..f5f2b3dbf2 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -19,7 +19,7 @@ extends Broadcast[T] with Logging with Serializable {
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false)
}
if (!isLocal) {
@@ -37,7 +37,7 @@ extends Broadcast[T] with Logging with Serializable {
val start = System.nanoTime
value_ = HttpBroadcast.read[T](uuid)
SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false)
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + uuid + " took " + time + " s")
}
@@ -80,8 +80,8 @@ private object HttpBroadcast extends Logging {
if (server != null) {
server.stop()
server = null
- initialized = false
}
+ initialized = false
}
}
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index 3b54d570be..574477a5fc 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -17,7 +17,7 @@ extends Broadcast[T] with Logging with Serializable {
MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false)
}
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@@ -112,7 +112,7 @@ extends Broadcast[T] with Logging with Serializable {
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY, false)
+ uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false)
} else {
logError("Reading Broadcasted variable " + uuid + " failed")
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 224c55d9d7..5384274d65 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -545,9 +545,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
if (level.useMemory) {
+ bytes.rewind()
memoryStore.putBytes(blockId, bytes, level)
}
if (level.useDisk) {
+ bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
@@ -639,7 +641,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory")
return
}
- memoryStore.remove(blockId)
+ memoryStore.remove(blockId)
val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
setLevelAndTellMaster(blockId, newLevel)
}
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 17c7a8de43..5c1577ed0b 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -1,6 +1,6 @@
package spark
-import java.io.File
+import java.io.{FileWriter, PrintWriter, File}
import scala.io.Source
@@ -142,4 +142,18 @@ class FileSuite extends FunSuite with BeforeAndAfter {
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}
+
+ test("file caching") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val out = new FileWriter(tempDir + "/input")
+ out.write("Hello world!\n")
+ out.write("What's up?\n")
+ out.write("Goodbye\n")
+ out.close()
+ val rdd = sc.textFile(tempDir + "/input").cache()
+ assert(rdd.count() === 3)
+ assert(rdd.count() === 3)
+ assert(rdd.count() === 3)
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index a1fe63beaf..ade457c0f9 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -72,6 +72,14 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
}
+ test("basic caching") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ assert(rdd.collect().toList === List(1, 2, 3, 4))
+ }
+
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index 15ebf0c9b8..0b5d439ca4 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -7,6 +7,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import org.scalatest.FunSuite
+import com.google.common.io.Files
class ReplSuite extends FunSuite {
def runInterpreter(master: String, input: String): String = {
@@ -118,7 +119,27 @@ class ReplSuite extends FunSuite {
assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
}
-
+
+ test ("interacting with files") {
+ val tempDir = Files.createTempDir()
+ val out = new FileWriter(tempDir + "/input")
+ out.write("Hello world!\n")
+ out.write("What's up?\n")
+ out.write("Goodbye\n")
+ out.close()
+ val output = runInterpreter("local", """
+ var file = sc.textFile("%s/input").cache()
+ file.count()
+ file.count()
+ file.count()
+ """.format(tempDir.getAbsolutePath))
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Long = 3", output)
+ assertContains("res1: Long = 3", output)
+ assertContains("res2: Long = 3", output)
+ }
+
if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
test ("running on Mesos") {
val output = runInterpreter("localquiet", """