aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala13
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala2
-rw-r--r--project/SparkBuild.scala58
3 files changed, 13 insertions, 60 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 19870408d3..22110832f8 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -222,11 +222,16 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
try {
- val values = new ArrayBuffer[Any]
- values ++= rdd.compute(split)
- blockManager.put(key, values.iterator, storageLevel, false)
+ // BlockManager will iterate over results from compute to create RDD
+ blockManager.put(key, rdd.compute(split), storageLevel, false)
//future.apply() // Wait for the reply from the cache tracker
- return values.iterator.asInstanceOf[Iterator[T]]
+ blockManager.get(key) match {
+ case Some(values) =>
+ return values.asInstanceOf[Iterator[T]]
+ case None =>
+ logWarning("loading partition failed after computing it " + key)
+ return null
+ }
} finally {
loading.synchronized {
loading.remove(key)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 5067601198..cde74e5805 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -580,6 +580,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
def getMaxMemoryFromSystemProperties(): Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
- (Runtime.getRuntime.totalMemory * memoryFraction).toLong
+ (Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a244f9c229..81e72be58d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -1,7 +1,5 @@
import sbt._
import Keys._
-import classpath.ClasspathUtilities.isArchive
-import java.io.FileOutputStream
import sbtassembly.Plugin._
import AssemblyKeys._
import twirl.sbt.TwirlPlugin._
@@ -72,12 +70,12 @@ object SparkBuild extends Build {
"cc.spray" % "spray-can" % "1.0-M2.1",
"cc.spray" % "spray-server" % "1.0-M2.1"
)
- ) ++ assemblySettings ++ extraAssemblySettings ++ mergeSettings ++ Twirl.settings
+ ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
def replSettings = sharedSettings ++ Seq(
name := "spark-repl",
libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _)
- ) ++ assemblySettings ++ extraAssemblySettings ++ mergeSettings
+ ) ++ assemblySettings ++ extraAssemblySettings
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples"
@@ -85,60 +83,10 @@ object SparkBuild extends Build {
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
- // Fix for "No configuration setting found for key 'akka.version'" exception
- // when running Spark from the jar generated by the "assembly" task; see
- // http://letitcrash.com/post/21025950392/howto-sbt-assembly-vs-reference-conf
- lazy val merge = TaskKey[File]("merge-reference",
- "merge all reference.conf")
-
- lazy val mergeSettings: Seq[Project.Setting[_]] = Seq(
- merge <<= (fullClasspath in assembly) map {
- c =>
- // collect from all elements of the full classpath
- val (libs, dirs) =
- c map (_.data) partition (isArchive)
- // goal is to simply concatenate files here
- val dest = file("reference.conf")
- val out = new FileOutputStream(dest)
- val append = IO.transfer(_: File, out)
- try {
- // first collect from managed sources
- (dirs * "reference.conf").get foreach append
- // then from dependency jars by unzipping and
- // collecting reference.conf if present
- for (lib <- libs) {
- IO withTemporaryDirectory {
- dir =>
- IO.unzip(lib, dir, "reference.conf")
- (dir * "reference.conf").get foreach append
- }
- }
- // return merged file location as task result
- dest
- } finally {
- out.close()
- }
- },
-
- // get rid of the individual files from jars
- excludedFiles in assembly <<=
- (excludedFiles in assembly) {
- (old) => (bases) =>
- old(bases) ++ (bases flatMap (base =>
- (base / "reference.conf").get))
- },
-
- // tell sbt-assembly to include our merged file
- assembledMappings in assembly <<=
- (assembledMappings in assembly, merge) map {
- (old, merged) => (f) =>
- old(f) :+(merged, "reference.conf")
- }
- )
-
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
+ case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
)