aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-11-04 21:02:36 -0800
committerReynold Xin <rxin@apache.org>2013-11-04 21:02:36 -0800
commit551a43fd3dfe24beed12961050b58aa0c0379b4c (patch)
tree6fd3a8a66b4efce4aa81bae444acc0d5eea1bce1
parent99bfcc91e010ba29852ec7dd0b4270805b7b2377 (diff)
parent7a26104ab7cb492b347ba761ef1f17ca1b9078e4 (diff)
downloadspark-551a43fd3dfe24beed12961050b58aa0c0379b4c.tar.gz
spark-551a43fd3dfe24beed12961050b58aa0c0379b4c.tar.bz2
spark-551a43fd3dfe24beed12961050b58aa0c0379b4c.zip
Merge branch 'master' of github.com:apache/incubator-spark into mergemerge
Conflicts: README.md core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
-rw-r--r--README.md20
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala189
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/BitSet.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/BitSet.scala)20
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala)13
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala)36
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala)20
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala51
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala84
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala)18
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala145
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala)10
-rw-r--r--core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala74
-rw-r--r--docs/cluster-overview.md14
-rw-r--r--docs/ec2-scripts.md2
-rwxr-xr-xec2/spark_ec2.py2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala3
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala5
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala39
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala13
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/package.scala4
36 files changed, 641 insertions, 344 deletions
diff --git a/README.md b/README.md
index ba31ed586d..7790139c8f 100644
--- a/README.md
+++ b/README.md
@@ -52,9 +52,9 @@ to interactively load, transform, and compute on massive graphs.
## Examples
-Suppose I want to build a graph from some text files, restrict the graph
+Suppose I want to build a graph from some text files, restrict the graph
to important relationships and users, run page-rank on the sub-graph, and
-then finally return attributes associated with the top users. I can do
+then finally return attributes associated with the top users. I can do
all of this in just a few lines with GraphX:
```scala
@@ -69,16 +69,16 @@ val users = sc.textFile("hdfs://user_attributes.tsv")
val followerGraph = Graph.textFile(sc, "hdfs://followers.tsv")
// Attach the user attributes
-val graph = followerGraph.outerJoinVertices(users){
+val graph = followerGraph.outerJoinVertices(users){
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
- case (uid, deg, None) => Array.empty[String]
+ case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users which have exactly two attributes
val subgraph = graph.subgraph((vid, attr) => attr.size == 2)
-// Compute the PageRank
+// Compute the PageRank
val pagerankGraph = Analytics.pagerank(subgraph)
// Get the attributes of the top pagerank users
@@ -86,7 +86,7 @@ val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices){
case (uid, attrList, Some(pr)) => (pr, attrList)
case (uid, attrList, None) => (pr, attrList)
}
-
+
println(userInfoWithPageRank.top(5))
```
@@ -160,10 +160,10 @@ with YARN, also set `SPARK_YARN=true`:
For convenience, these variables may also be set through the
`conf/spark-env.sh` file described below.
-When developing a Spark application, specify the Hadoop version by
-adding the "hadoop-client" artifact to your project's
-dependencies. For example, if you're using Hadoop 1.0.1 and build your
-application using SBT, add this entry to `libraryDependencies`:
+When developing a Spark application, specify the Hadoop version by adding the
+"hadoop-client" artifact to your project's dependencies. For example, if you're
+using Hadoop 1.2.1 and build your application using SBT, add this entry to
+`libraryDependencies`:
"org.apache.hadoop" % "hadoop-client" % "1.2.1"
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 158197ae4d..880b49e8ef 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
-import org.apache.spark.deploy.LocalSparkCluster
+import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
@@ -245,7 +245,7 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val env = SparkEnv.get
- val conf = env.hadoop.newConfiguration()
+ val conf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
@@ -255,8 +255,10 @@ class SparkContext(
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
- conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
+ Utils.getSystemProperties.foreach { case (key, value) =>
+ if (key.startsWith("spark.hadoop.")) {
+ conf.set(key.substring("spark.hadoop.".length), value)
+ }
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
@@ -379,7 +381,7 @@ class SparkContext(
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
- SparkEnv.get.hadoop.addCredentials(conf)
+ SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
@@ -589,7 +591,8 @@ class SparkContext(
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
- case _ => path
+ case "local" => "file:" + uri.getPath
+ case _ => path
}
addedFiles(key) = System.currentTimeMillis
@@ -698,7 +701,7 @@ class SparkContext(
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
- if (env.hadoop.isYarnMode()) {
+ if (SparkHadoopUtil.get.isYarnMode()) {
// In order for this to work on yarn the user must specify the --addjars option to
// the client to upload the file into the distributed cache to make it show up in the
// current working directory.
@@ -936,9 +939,8 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
- val env = SparkEnv.get
val path = new Path(dir)
- val fs = path.getFileSystem(env.hadoop.newConfiguration())
+ val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index aaab717bcf..ff2df8fb6a 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
import org.apache.spark.network.ConnectionManager
import org.apache.spark.serializer.{Serializer, SerializerManager}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.api.python.PythonWorkerFactory
+import com.google.common.collect.MapMaker
/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
@@ -58,18 +58,9 @@ class SparkEnv (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
- val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
- if(yarnMode) {
- try {
- Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
- } catch {
- case th: Throwable => throw new SparkException("Unable to load YARN support", th)
- }
- } else {
- new SparkHadoopUtil
- }
- }
+ // A general, soft-reference map for metadata needed during HadoopRDD split computation
+ // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
+ private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 83cd3df5fa..6bc846aa92 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -20,17 +20,13 @@ package org.apache.spark.deploy
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
-import com.google.common.collect.MapMaker
-
+import org.apache.spark.SparkException
/**
* Contains util methods to interact with Hadoop from Spark.
*/
private[spark]
class SparkHadoopUtil {
- // A general, soft-reference map for metadata needed during HadoopRDD split computation
- // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
- private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
@@ -45,5 +41,23 @@ class SparkHadoopUtil {
def addCredentials(conf: JobConf) {}
def isYarnMode(): Boolean = { false }
-
+}
+
+object SparkHadoopUtil {
+ private val hadoop = {
+ val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if (yarnMode) {
+ try {
+ Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ } catch {
+ case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+ }
+ } else {
+ new SparkHadoopUtil
+ }
+ }
+
+ def get: SparkHadoopUtil = {
+ hadoop
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index ccaaecb85b..d3033ea4a6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.rdd
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{NullWritable, BytesWritable}
@@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging {
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
+ val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging {
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
- val fs = path.getFileSystem(env.hadoop.newConfiguration())
+ val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
@@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(env.hadoop.newConfiguration())
+ val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index fad042c7ae..32901a508f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
@@ -198,10 +199,10 @@ private[spark] object HadoopRDD {
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
*/
- def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+ def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)
- def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+ def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)
def putCachedMetadata(key: String, value: Any) =
- SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
+ SparkEnv.get.hadoopJobMetadata.put(key, value)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 370ccd183c..1791ee660d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation
@@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
- val env = SparkEnv.get
val conf = new JobConf(configuration)
- env.hadoop.addCredentials(conf)
+ SparkHadoopUtil.get.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
- val env = SparkEnv.get
val jobConf = new JobConf(configuration)
- env.hadoop.addCredentials(jobConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 24d97da6eb..1dc71a0428 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -146,26 +146,26 @@ private[spark] class ShuffleMapTask(
metrics = Some(context.taskMetrics)
val blockManager = SparkEnv.get.blockManager
- var shuffle: ShuffleBlocks = null
- var buckets: ShuffleWriterGroup = null
+ val shuffleBlockManager = blockManager.shuffleBlockManager
+ var shuffle: ShuffleWriterGroup = null
+ var success = false
try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
- shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
- buckets = shuffle.acquireWriters(partitionId)
+ shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
- buckets.writers(bucketId).write(pair)
+ shuffle.writers(bucketId).write(pair)
}
// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
var totalTime = 0L
- val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
+ val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
val size = writer.fileSegment().length
totalBytes += size
@@ -179,19 +179,20 @@ private[spark] class ShuffleMapTask(
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
+ success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
- if (buckets != null) {
- buckets.writers.foreach(_.revertPartialWrites())
+ if (shuffle != null) {
+ shuffle.writers.foreach(_.revertPartialWrites())
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
- if (shuffle != null && buckets != null) {
- buckets.writers.foreach(_.close())
- shuffle.releaseWriters(buckets)
+ if (shuffle != null && shuffle.writers != null) {
+ shuffle.writers.foreach(_.close())
+ shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.
context.executeOnCompleteCallbacks()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index 55f8313e87..53bf78267e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -175,7 +175,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
reason.className, reason.description, locs.mkString("\n")))
if (numFailures(index) > MAX_TASK_FAILURES) {
val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
- taskSet.id, index, 4, reason.description)
+ taskSet.id, index, MAX_TASK_FAILURES, reason.description)
decreaseRunningTasks(runningTasks)
sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 76d537f8e8..fbedfbc446 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.io.{InputStream, OutputStream}
+import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{HashMap, ArrayBuffer}
@@ -47,7 +47,7 @@ private[spark] class BlockManager(
extends Logging {
val shuffleBlockManager = new ShuffleBlockManager(this)
- val diskBlockManager = new DiskBlockManager(
+ val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
@@ -462,15 +462,11 @@ private[spark] class BlockManager(
* This is currently used for writing shuffle files out. Callers should handle error
* cases.
*/
- def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int)
+ def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
- val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
- if (shuffleBlockManager.consolidateShuffleFiles) {
- diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
- }
val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 32d2dd0694..e49c191c70 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -78,11 +78,11 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
class DiskBlockObjectWriter(
- blockId: BlockId,
- file: File,
- serializer: Serializer,
- bufferSize: Int,
- compressStream: OutputStream => OutputStream)
+ blockId: BlockId,
+ file: File,
+ serializer: Serializer,
+ bufferSize: Int,
+ compressStream: OutputStream => OutputStream)
extends BlockObjectWriter(blockId)
with Logging
{
@@ -111,8 +111,8 @@ class DiskBlockObjectWriter(
private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
- private var initialPosition = 0L
- private var lastValidPosition = 0L
+ private val initialPosition = file.length()
+ private var lastValidPosition = initialPosition
private var initialized = false
private var _timeWriting = 0L
@@ -120,7 +120,6 @@ class DiskBlockObjectWriter(
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
- initialPosition = channel.position
lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index bcb58ad946..fcd2e97982 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -20,12 +20,11 @@ package org.apache.spark.storage
import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random}
-import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
+import org.apache.spark.util.Utils
/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
@@ -35,7 +34,8 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
*
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/
-private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging {
+private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String)
+ extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
@@ -47,54 +47,23 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null
- // Stores only Blocks which have been specifically mapped to segments of files
- // (rather than the default, which maps a Block to a whole file).
- // This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks.
- private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment]
-
- val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup)
-
addShutdownHook()
/**
- * Creates a logical mapping from the given BlockId to a segment of a file.
- * This will cause any accesses of the logical BlockId to be directed to the specified
- * physical location.
- */
- def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) {
- blockToFileSegmentMap.put(blockId, fileSegment)
- }
-
- /**
* Returns the phyiscal file segment in which the given BlockId is located.
* If the BlockId has been mapped to a specific FileSegment, that will be returned.
* Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
*/
def getBlockLocation(blockId: BlockId): FileSegment = {
- if (blockToFileSegmentMap.internalMap.containsKey(blockId)) {
- blockToFileSegmentMap.get(blockId).get
+ if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
+ shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
} else {
val file = getFile(blockId.name)
new FileSegment(file, 0, file.length())
}
}
- /**
- * Simply returns a File to place the given Block into. This does not physically create the file.
- * If filename is given, that file will be used. Otherwise, we will use the BlockId to get
- * a unique filename.
- */
- def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = {
- val actualFilename = if (filename == "") blockId.name else filename
- val file = getFile(actualFilename)
- if (!allowAppending && file.exists()) {
- throw new IllegalStateException(
- "Attempted to create file that already exists: " + actualFilename)
- }
- file
- }
-
- private def getFile(filename: String): File = {
+ def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
@@ -119,6 +88,8 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
new File(subDir, filename)
}
+ def getFile(blockId: BlockId): File = getFile(blockId.name)
+
private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
@@ -151,10 +122,6 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
}
}
- private def cleanup(cleanupTime: Long) {
- blockToFileSegmentMap.clearOldValues(cleanupTime)
- }
-
private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index a3c496f9e0..5a1e7b4444 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -44,7 +44,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val bytes = _bytes.duplicate()
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
- val file = diskManager.createBlockFile(blockId, allowAppending = false)
+ val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel()
while (bytes.remaining > 0) {
channel.write(bytes)
@@ -64,7 +64,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
- val file = diskManager.createBlockFile(blockId, allowAppending = false)
+ val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
val length = file.length
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 066e45a12b..2f1b049ce4 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -17,33 +17,45 @@
package org.apache.spark.storage
+import java.io.File
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConversions._
+
import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
+import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
-private[spark]
-class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter])
+/** A group of writers for a ShuffleMapTask, one writer per reducer. */
+private[spark] trait ShuffleWriterGroup {
+ val writers: Array[BlockObjectWriter]
-private[spark]
-trait ShuffleBlocks {
- def acquireWriters(mapId: Int): ShuffleWriterGroup
- def releaseWriters(group: ShuffleWriterGroup)
+ /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
+ def releaseWriters(success: Boolean)
}
/**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
- * per reducer.
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
+ * per reducer (this set of files is called a ShuffleFileGroup).
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
- * it releases them for another task.
+ * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
+ * files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
+ * that specifies where in a given file the actual block data is located.
+ *
+ * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
+ * each block stored in each file. In order to find the location of a shuffle block, we search the
+ * files within a ShuffleFileGroups associated with the block's reducer.
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
@@ -52,45 +64,152 @@ class ShuffleBlockManager(blockManager: BlockManager) {
val consolidateShuffleFiles =
System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean
- var nextFileId = new AtomicInteger(0)
- val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()
+ private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+
+ /**
+ * Contains all the state related to a particular shuffle. This includes a pool of unused
+ * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
+ */
+ private class ShuffleState() {
+ val nextFileId = new AtomicInteger(0)
+ val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+ val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+ }
+
+ type ShuffleId = Int
+ private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
+
+ private
+ val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
- def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = {
- new ShuffleBlocks {
- // Get a group of writers for a map task.
- override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
- val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
- val fileId = getUnusedFileId()
- val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+ def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
+ new ShuffleWriterGroup {
+ shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
+ private val shuffleState = shuffleStates(shuffleId)
+ private var fileGroup: ShuffleFileGroup = null
+
+ val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
+ fileGroup = getUnusedFileGroup()
+ Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- if (consolidateShuffleFiles) {
- val filename = physicalFileName(shuffleId, bucketId, fileId)
- blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
- } else {
- blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
+ blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
+ }
+ } else {
+ Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+ val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+ val blockFile = blockManager.diskBlockManager.getFile(blockId)
+ blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
+ }
+ }
+
+ override def releaseWriters(success: Boolean) {
+ if (consolidateShuffleFiles) {
+ if (success) {
+ val offsets = writers.map(_.fileSegment().offset)
+ fileGroup.recordMapOutput(mapId, offsets)
}
+ recycleFileGroup(fileGroup)
}
- new ShuffleWriterGroup(mapId, fileId, writers)
}
- override def releaseWriters(group: ShuffleWriterGroup) {
- recycleFileId(group.fileId)
+ private def getUnusedFileGroup(): ShuffleFileGroup = {
+ val fileGroup = shuffleState.unusedFileGroups.poll()
+ if (fileGroup != null) fileGroup else newFileGroup()
+ }
+
+ private def newFileGroup(): ShuffleFileGroup = {
+ val fileId = shuffleState.nextFileId.getAndIncrement()
+ val files = Array.tabulate[File](numBuckets) { bucketId =>
+ val filename = physicalFileName(shuffleId, bucketId, fileId)
+ blockManager.diskBlockManager.getFile(filename)
+ }
+ val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files)
+ shuffleState.allFileGroups.add(fileGroup)
+ fileGroup
}
- }
- }
- private def getUnusedFileId(): Int = {
- val fileId = unusedFileIds.poll()
- if (fileId == null) nextFileId.getAndIncrement() else fileId
+ private def recycleFileGroup(group: ShuffleFileGroup) {
+ shuffleState.unusedFileGroups.add(group)
+ }
+ }
}
- private def recycleFileId(fileId: Int) {
- if (consolidateShuffleFiles) {
- unusedFileIds.add(fileId)
+ /**
+ * Returns the physical file segment in which the given BlockId is located.
+ * This function should only be called if shuffle file consolidation is enabled, as it is
+ * an error condition if we don't find the expected block.
+ */
+ def getBlockLocation(id: ShuffleBlockId): FileSegment = {
+ // Search all file groups associated with this shuffle.
+ val shuffleState = shuffleStates(id.shuffleId)
+ for (fileGroup <- shuffleState.allFileGroups) {
+ val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
+ if (segment.isDefined) { return segment.get }
}
+ throw new IllegalStateException("Failed to find shuffle block: " + id)
}
private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
"merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
}
+
+ private def cleanup(cleanupTime: Long) {
+ shuffleStates.clearOldValues(cleanupTime)
+ }
+}
+
+private[spark]
+object ShuffleBlockManager {
+ /**
+ * A group of shuffle files, one per reducer.
+ * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
+ */
+ private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
+ /**
+ * Stores the absolute index of each mapId in the files of this group. For instance,
+ * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
+ */
+ private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
+
+ /**
+ * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
+ * This ordering allows us to compute block lengths by examining the following block offset.
+ * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
+ * reducer.
+ */
+ private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+ new PrimitiveVector[Long]()
+ }
+
+ def numBlocks = mapIdToIndex.size
+
+ def apply(bucketId: Int) = files(bucketId)
+
+ def recordMapOutput(mapId: Int, offsets: Array[Long]) {
+ mapIdToIndex(mapId) = numBlocks
+ for (i <- 0 until offsets.length) {
+ blockOffsetsByReducer(i) += offsets(i)
+ }
+ }
+
+ /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
+ def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
+ val file = files(reducerId)
+ val blockOffsets = blockOffsetsByReducer(reducerId)
+ val index = mapIdToIndex.getOrElse(mapId, -1)
+ if (index >= 0) {
+ val offset = blockOffsets(index)
+ val length =
+ if (index + 1 < numBlocks) {
+ blockOffsets(index + 1) - offset
+ } else {
+ file.length() - offset
+ }
+ assert(length >= 0)
+ Some(new FileSegment(file, offset, length))
+ } else {
+ None
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
index 7dcadc3805..1e4db4f66b 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -38,19 +38,19 @@ object StoragePerfTester {
val blockManager = sc.env.blockManager
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
- val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits,
+ val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer())
- val buckets = shuffle.acquireWriters(mapId)
+ val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {
- buckets.writers(i % numOutputSplits).write(writeData)
+ writers(i % numOutputSplits).write(writeData)
}
- buckets.writers.map {w =>
+ writers.map {w =>
w.commit()
total.addAndGet(w.fileSegment().length)
w.close()
}
- shuffle.releaseWriters(buckets)
+ shuffle.releaseWriters(true)
}
val start = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 3f963727d9..67a7f87a5c 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -59,7 +59,7 @@ object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext
"ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") {
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
- SHUFFLE_MAP_TASK, BLOCK_MANAGER, DISK_BLOCK_MANAGER, BROADCAST_VARS = Value
+ SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
type MetadataCleanerType = Value
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a3b3968c5e..fe932d8ede 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,13 +18,12 @@
package org.apache.spark.util
import java.io._
-import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
+import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.regex.Pattern
+import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.Map
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
@@ -36,7 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkEnv, SparkException, Logging}
+import org.apache.spark.{SparkException, Logging}
/**
@@ -148,7 +147,7 @@ private[spark] object Utils extends Logging {
return buf
}
- private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
+ private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
@@ -280,9 +279,8 @@ private[spark] object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
- val env = SparkEnv.get
val uri = new URI(url)
- val conf = env.hadoop.newConfiguration()
+ val conf = SparkHadoopUtil.get.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
@@ -819,4 +817,10 @@ private[spark] object Utils extends Logging {
// Nothing else to guard against ?
hashAbs
}
+
+ /** Returns a copy of the system properties that is thread-safe to iterator over. */
+ def getSystemProperties(): Map[String, String] = {
+ return System.getProperties().clone()
+ .asInstanceOf[java.util.Properties].toMap[String, String]
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 69b10566f3..5e264b48dd 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -57,10 +57,10 @@ class BitSet(numBits: Int) {
assert(newBS.numWords >= numWords)
assert(newBS.numWords >= other.numWords)
var ind = 0
- while( ind < smaller ) {
+ while( ind < smaller ) {
newBS.words(ind) = words(ind) & other.words(ind)
ind += 1
- }
+ }
newBS
}
@@ -75,18 +75,18 @@ class BitSet(numBits: Int) {
assert(newBS.numWords >= other.numWords)
val smaller = math.min(numWords, other.numWords)
var ind = 0
- while( ind < smaller ) {
+ while( ind < smaller ) {
newBS.words(ind) = words(ind) | other.words(ind)
ind += 1
}
- while( ind < numWords ) {
+ while( ind < numWords ) {
newBS.words(ind) = words(ind)
ind += 1
- }
- while( ind < other.numWords ) {
+ }
+ while( ind < other.numWords ) {
newBS.words(ind) = other.words(ind)
ind += 1
- }
+ }
newBS
}
@@ -110,7 +110,7 @@ class BitSet(numBits: Int) {
*/
def get(index: Int): Boolean = {
val bitmask = 1L << (index & 0x3f) // mod 64 and shift
- (words(index >>> 6) & bitmask) != 0 // div by 64 and mask
+ (words(index >> 6) & bitmask) != 0 // div by 64 and mask
}
@@ -181,5 +181,5 @@ class BitSet(numBits: Int) {
/** Return the number of longs it would take to hold numBits. */
- private def bit2words(numBits: Int) = ((numBits - 1) >>> 6) + 1
+ private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
}
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index e53551ced6..1e9faaa5a0 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -34,7 +34,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
/**
* Allocate an OpenHashMap with a fixed initial capacity
*/
- def this(initialCapacity: Int = 64) =
+ def this(initialCapacity: Int = 64) =
this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
/**
@@ -42,7 +42,6 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
*/
def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
-
@transient private var _oldValues: Array[V] = null
// Treat the null key differently so we can use nulls in "data" to represent empty items.
@@ -71,7 +70,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
haveNullValue = true
nullValue = v
} else {
- val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = v
keySet.rehashIfNeeded(k, grow, move)
_oldValues = null
@@ -88,7 +87,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
nullValue = v
}
} else {
- val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = mergeF(_values(pos), v)
keySet.rehashIfNeeded(k, grow, move)
_oldValues = null
@@ -111,8 +110,8 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
}
nullValue
} else {
- val pos = keySet.fastAdd(k)
- if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
+ val pos = keySet.addWithoutResize(k)
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue
keySet.rehashIfNeeded(k, grow, move)
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index d083ab26ac..f8d54a8f73 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -43,6 +43,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity")
+ require(loadFactor < 1.0, "Load factor must be less than 1.0")
+ require(loadFactor > 0.0, "Load factor must be greater than 0.0")
import OpenHashSet._
@@ -78,11 +80,15 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
protected var _mask = _capacity - 1
protected var _size = 0
- protected var _data = classManifest[T].newArray(_capacity)
protected var _bitset = new BitSet(_capacity)
def getBitSet = _bitset
+ // Init of the array in constructor (instead of in declaration) to work around a Scala compiler
+ // specialization bug that would generate two arrays (one for Object and one for specialized T).
+ protected var _data: Array[T] = _
+ _data = new Array[T](_capacity)
+
/** Number of elements in the set. */
def size: Int = _size
@@ -97,7 +103,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
* and rehash all elements.
*/
def add(k: T) {
- fastAdd(k)
+ addWithoutResize(k)
rehashIfNeeded(k, grow, move)
}
@@ -111,14 +117,14 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
* @return The position where the key is placed, plus the highest order bit is set if the key
* exists previously.
*/
- def fastAdd(k: T): Int = putInto(_bitset, _data, k)
+ def addWithoutResize(k: T): Int = putInto(_bitset, _data, k)
/**
* Rehash the set if it is overloaded.
* @param k A parameter unused in the function, but to force the Scala compiler to specialize
* this method.
- * @param allocateFunc Closure invoked when we are allocating a new, larger array.
- * @param moveFunc Closure invoked when we move the key from one position (in the old data array)
+ * @param allocateFunc Callback invoked when we are allocating a new, larger array.
+ * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
* to a new position (in the new data array).
*/
def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
@@ -127,7 +133,9 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
}
}
- /** Return the position of the element in the underlying array. */
+ /**
+ * Return the position of the element in the underlying array, or INVALID_POS if it is not found.
+ */
def getPos(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask
var i = 1
@@ -163,7 +171,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
/**
* Put an entry into the set. Return the position where the key is placed. In addition, the
- * highest bid in the returned position is set if the key exists prior to this put.
+ * highest bit in the returned position is set if the key exists prior to this put.
*
* This function assumes the data array has at least one empty slot.
*/
@@ -177,7 +185,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
data(pos) = k
bitset.set(pos)
_size += 1
- return pos | EXISTENCE_MASK
+ return pos | NONEXISTENCE_MASK
} else if (data(pos) == k) {
// Found an existing key.
return pos
@@ -199,8 +207,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
*
* @param k A parameter unused in the function, but to force the Scala compiler to specialize
* this method.
- * @param allocateFunc Closure invoked when we are allocating a new, larger array.
- * @param moveFunc Closure invoked when we move the key from one position (in the old data array)
+ * @param allocateFunc Callback invoked when we are allocating a new, larger array.
+ * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
* to a new position (in the new data array).
*/
private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
@@ -208,7 +216,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
allocateFunc(newCapacity)
- val newData = classManifest[T].newArray(newCapacity)
+ val newData = new Array[T](newCapacity)
val newBitset = new BitSet(newCapacity)
var pos = 0
_size = 0
@@ -245,9 +253,7 @@ private[spark]
object OpenHashSet {
val INVALID_POS = -1
-
- val EXISTENCE_MASK = 0x80000000
-
+ val NONEXISTENCE_MASK = 0x80000000
val POSITION_MASK = 0xEFFFFFF
/**
diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 08fc74e5da..987077dd8a 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -35,7 +35,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
/**
* Allocate an OpenHashMap with a fixed initial capacity
*/
- def this(initialCapacity: Int = 64) =
+ def this(initialCapacity: Int = 64) =
this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
/**
@@ -55,9 +55,15 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
_values(pos)
}
+ /** Get the value for a given key, or returns elseValue if it doesn't exist. */
+ def getOrElse(k: K, elseValue: V): V = {
+ val pos = keySet.getPos(k)
+ if (pos >= 0) _values(pos) else elseValue
+ }
+
/** Set the value for a key */
def update(k: K, v: V) {
- val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = v
keySet.rehashIfNeeded(k, grow, move)
_oldValues = null
@@ -66,9 +72,9 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
/** Set the value for a key */
def setMerge(k: K, v: V, mergeF: (V,V) => V) {
- val pos = keySet.fastAdd(k)
+ val pos = keySet.addWithoutResize(k)
val ind = pos & OpenHashSet.POSITION_MASK
- if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { // if first add
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add
_values(ind) = v
} else {
_values(ind) = mergeF(_values(ind), v)
@@ -85,8 +91,8 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
* @return the newly updated value.
*/
def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
- val pos = keySet.fastAdd(k)
- if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
+ val pos = keySet.addWithoutResize(k)
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue
keySet.rehashIfNeeded(k, grow, move)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
new file mode 100644
index 0000000000..369519c559
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
+private[spark]
+class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
+ private var numElements = 0
+ private var array: Array[V] = _
+
+ // NB: This must be separate from the declaration, otherwise the specialized parent class
+ // will get its own array with the same initial size. TODO: Figure out why...
+ array = new Array[V](initialSize)
+
+ def apply(index: Int): V = {
+ require(index < numElements)
+ array(index)
+ }
+
+ def +=(value: V) {
+ if (numElements == array.length) { resize(array.length * 2) }
+ array(numElements) = value
+ numElements += 1
+ }
+
+ def length = numElements
+
+ def getUnderlyingArray = array
+
+ /** Resizes the array, dropping elements if the total length decreases. */
+ def resize(newLength: Int) {
+ val newArray = new Array[V](newLength)
+ array.copyToArray(newArray)
+ array = newArray
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
new file mode 100644
index 0000000000..0b9056344c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -0,0 +1,84 @@
+package org.apache.spark.storage
+
+import java.io.{FileWriter, File}
+
+import scala.collection.mutable
+
+import com.google.common.io.Files
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
+
+ val rootDir0 = Files.createTempDir()
+ rootDir0.deleteOnExit()
+ val rootDir1 = Files.createTempDir()
+ rootDir1.deleteOnExit()
+ val rootDirs = rootDir0.getName + "," + rootDir1.getName
+ println("Created root dirs: " + rootDirs)
+
+ val shuffleBlockManager = new ShuffleBlockManager(null) {
+ var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
+ override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
+ }
+
+ var diskBlockManager: DiskBlockManager = _
+
+ override def beforeEach() {
+ diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
+ shuffleBlockManager.idToSegmentMap.clear()
+ }
+
+ test("basic block creation") {
+ val blockId = new TestBlockId("test")
+ assertSegmentEquals(blockId, blockId.name, 0, 0)
+
+ val newFile = diskBlockManager.getFile(blockId)
+ writeToFile(newFile, 10)
+ assertSegmentEquals(blockId, blockId.name, 0, 10)
+
+ newFile.delete()
+ }
+
+ test("block appending") {
+ val blockId = new TestBlockId("test")
+ val newFile = diskBlockManager.getFile(blockId)
+ writeToFile(newFile, 15)
+ assertSegmentEquals(blockId, blockId.name, 0, 15)
+ val newFile2 = diskBlockManager.getFile(blockId)
+ assert(newFile === newFile2)
+ writeToFile(newFile2, 12)
+ assertSegmentEquals(blockId, blockId.name, 0, 27)
+ newFile.delete()
+ }
+
+ test("block remapping") {
+ val filename = "test"
+ val blockId0 = new ShuffleBlockId(1, 2, 3)
+ val newFile = diskBlockManager.getFile(filename)
+ writeToFile(newFile, 15)
+ shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15)
+ assertSegmentEquals(blockId0, filename, 0, 15)
+
+ val blockId1 = new ShuffleBlockId(1, 2, 4)
+ val newFile2 = diskBlockManager.getFile(filename)
+ writeToFile(newFile2, 12)
+ shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12)
+ assertSegmentEquals(blockId1, filename, 15, 12)
+
+ assert(newFile === newFile2)
+ newFile.delete()
+ }
+
+ def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
+ val segment = diskBlockManager.getBlockLocation(blockId)
+ assert(segment.file.getName === filename)
+ assert(segment.offset === offset)
+ assert(segment.length === length)
+ }
+
+ def writeToFile(file: File, numBytes: Int) {
+ val writer = new FileWriter(file, true)
+ for (i <- 0 until numBytes) writer.write(i)
+ writer.close()
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
index 41ede860d2..0f1ab3d20e 100644
--- a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
import org.scalatest.FunSuite
diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
index 355784da32..ca3f684668 100644
--- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
@@ -82,7 +82,7 @@ class OpenHashMapSuite extends FunSuite {
test("null keys") {
val map = new OpenHashMap[String, String]()
for (i <- 1 to 100) {
- map("" + i) = "" + i
+ map(i.toString) = i.toString
}
assert(map.size === 100)
assert(map(null) === null)
@@ -94,7 +94,7 @@ class OpenHashMapSuite extends FunSuite {
test("null values") {
val map = new OpenHashMap[String, String]()
for (i <- 1 to 100) {
- map("" + i) = null
+ map(i.toString) = null
}
assert(map.size === 100)
assert(map("1") === null)
@@ -108,12 +108,12 @@ class OpenHashMapSuite extends FunSuite {
test("changeValue") {
val map = new OpenHashMap[String, String]()
for (i <- 1 to 100) {
- map("" + i) = "" + i
+ map(i.toString) = i.toString
}
assert(map.size === 100)
for (i <- 1 to 100) {
- val res = map.changeValue("" + i, { assert(false); "" }, v => {
- assert(v === "" + i)
+ val res = map.changeValue(i.toString, { assert(false); "" }, v => {
+ assert(v === i.toString)
v + "!"
})
assert(res === i + "!")
@@ -121,7 +121,7 @@ class OpenHashMapSuite extends FunSuite {
// Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a
// bug where changeValue would return the wrong result when the map grew on that insert
for (i <- 101 to 400) {
- val res = map.changeValue("" + i, { i + "!" }, v => { assert(false); v })
+ val res = map.changeValue(i.toString, { i + "!" }, v => { assert(false); v })
assert(res === i + "!")
}
assert(map.size === 400)
@@ -138,11 +138,11 @@ class OpenHashMapSuite extends FunSuite {
test("inserting in capacity-1 map") {
val map = new OpenHashMap[String, String](1)
for (i <- 1 to 100) {
- map("" + i) = "" + i
+ map(i.toString) = i.toString
}
assert(map.size === 100)
for (i <- 1 to 100) {
- assert(map("" + i) === "" + i)
+ assert(map(i.toString) === i.toString)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
new file mode 100644
index 0000000000..4e11e8a628
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
@@ -0,0 +1,145 @@
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+
+class OpenHashSetSuite extends FunSuite {
+
+ test("primitive int") {
+ val set = new OpenHashSet[Int]
+ assert(set.size === 0)
+ assert(!set.contains(10))
+ assert(!set.contains(50))
+ assert(!set.contains(999))
+ assert(!set.contains(10000))
+
+ set.add(10)
+ assert(set.contains(10))
+ assert(!set.contains(50))
+ assert(!set.contains(999))
+ assert(!set.contains(10000))
+
+ set.add(50)
+ assert(set.size === 2)
+ assert(set.contains(10))
+ assert(set.contains(50))
+ assert(!set.contains(999))
+ assert(!set.contains(10000))
+
+ set.add(999)
+ assert(set.size === 3)
+ assert(set.contains(10))
+ assert(set.contains(50))
+ assert(set.contains(999))
+ assert(!set.contains(10000))
+
+ set.add(50)
+ assert(set.size === 3)
+ assert(set.contains(10))
+ assert(set.contains(50))
+ assert(set.contains(999))
+ assert(!set.contains(10000))
+ }
+
+ test("primitive long") {
+ val set = new OpenHashSet[Long]
+ assert(set.size === 0)
+ assert(!set.contains(10L))
+ assert(!set.contains(50L))
+ assert(!set.contains(999L))
+ assert(!set.contains(10000L))
+
+ set.add(10L)
+ assert(set.size === 1)
+ assert(set.contains(10L))
+ assert(!set.contains(50L))
+ assert(!set.contains(999L))
+ assert(!set.contains(10000L))
+
+ set.add(50L)
+ assert(set.size === 2)
+ assert(set.contains(10L))
+ assert(set.contains(50L))
+ assert(!set.contains(999L))
+ assert(!set.contains(10000L))
+
+ set.add(999L)
+ assert(set.size === 3)
+ assert(set.contains(10L))
+ assert(set.contains(50L))
+ assert(set.contains(999L))
+ assert(!set.contains(10000L))
+
+ set.add(50L)
+ assert(set.size === 3)
+ assert(set.contains(10L))
+ assert(set.contains(50L))
+ assert(set.contains(999L))
+ assert(!set.contains(10000L))
+ }
+
+ test("non-primitive") {
+ val set = new OpenHashSet[String]
+ assert(set.size === 0)
+ assert(!set.contains(10.toString))
+ assert(!set.contains(50.toString))
+ assert(!set.contains(999.toString))
+ assert(!set.contains(10000.toString))
+
+ set.add(10.toString)
+ assert(set.size === 1)
+ assert(set.contains(10.toString))
+ assert(!set.contains(50.toString))
+ assert(!set.contains(999.toString))
+ assert(!set.contains(10000.toString))
+
+ set.add(50.toString)
+ assert(set.size === 2)
+ assert(set.contains(10.toString))
+ assert(set.contains(50.toString))
+ assert(!set.contains(999.toString))
+ assert(!set.contains(10000.toString))
+
+ set.add(999.toString)
+ assert(set.size === 3)
+ assert(set.contains(10.toString))
+ assert(set.contains(50.toString))
+ assert(set.contains(999.toString))
+ assert(!set.contains(10000.toString))
+
+ set.add(50.toString)
+ assert(set.size === 3)
+ assert(set.contains(10.toString))
+ assert(set.contains(50.toString))
+ assert(set.contains(999.toString))
+ assert(!set.contains(10000.toString))
+ }
+
+ test("non-primitive set growth") {
+ val set = new OpenHashSet[String]
+ for (i <- 1 to 1000) {
+ set.add(i.toString)
+ }
+ assert(set.size === 1000)
+ assert(set.capacity > 1000)
+ for (i <- 1 to 100) {
+ set.add(i.toString)
+ }
+ assert(set.size === 1000)
+ assert(set.capacity > 1000)
+ }
+
+ test("primitive set growth") {
+ val set = new OpenHashSet[Long]
+ for (i <- 1 to 1000) {
+ set.add(i.toLong)
+ }
+ assert(set.size === 1000)
+ assert(set.capacity > 1000)
+ for (i <- 1 to 100) {
+ set.add(i.toLong)
+ }
+ assert(set.size === 1000)
+ assert(set.capacity > 1000)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
index b9a4b54544..dfd6aed2c4 100644
--- a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
@@ -58,12 +58,12 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite {
test("changeValue") {
val map = new PrimitiveKeyOpenHashMap[Long, String]()
for (i <- 1 to 100) {
- map(i.toLong) = "" + i
+ map(i.toLong) = i.toString
}
assert(map.size === 100)
for (i <- 1 to 100) {
val res = map.changeValue(i.toLong, { assert(false); "" }, v => {
- assert(v === "" + i)
+ assert(v === i.toString)
v + "!"
})
assert(res === i + "!")
@@ -80,11 +80,11 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite {
test("inserting in capacity-1 map") {
val map = new PrimitiveKeyOpenHashMap[Long, String](1)
for (i <- 1 to 100) {
- map(i.toLong) = "" + i
+ map(i.toLong) = i.toString
}
assert(map.size === 100)
for (i <- 1 to 100) {
- assert(map(i.toLong) === "" + i)
+ assert(map(i.toLong) === i.toString)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala
deleted file mode 100644
index b5b3a4abe1..0000000000
--- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.spark.util.hash
-
-import org.scalatest.FunSuite
-
-
-class OpenHashSetSuite extends FunSuite {
-
- test("primitive int") {
- val set = new OpenHashSet[Int]
- assert(set.size === 0)
- set.add(10)
- assert(set.size === 1)
- set.add(50)
- assert(set.size === 2)
- set.add(999)
- assert(set.size === 3)
- set.add(50)
- assert(set.size === 3)
- }
-
- test("primitive long") {
- val set = new OpenHashSet[Long]
- assert(set.size === 0)
- set.add(10L)
- assert(set.size === 1)
- set.add(50L)
- assert(set.size === 2)
- set.add(999L)
- assert(set.size === 3)
- set.add(50L)
- assert(set.size === 3)
- }
-
- test("non-primitive") {
- val set = new OpenHashSet[String]
- assert(set.size === 0)
- set.add(10.toString)
- assert(set.size === 1)
- set.add(50.toString)
- assert(set.size === 2)
- set.add(999.toString)
- assert(set.size === 3)
- set.add(50.toString)
- assert(set.size === 3)
- }
-
- test("non-primitive set growth") {
- val set = new OpenHashSet[String]
- for (i <- 1 to 1000) {
- set.add(i.toString)
- }
- assert(set.size === 1000)
- assert(set.capacity > 1000)
- for (i <- 1 to 100) {
- set.add(i.toString)
- }
- assert(set.size === 1000)
- assert(set.capacity > 1000)
- }
-
- test("primitive set growth") {
- val set = new OpenHashSet[Long]
- for (i <- 1 to 1000) {
- set.add(i.toLong)
- }
- assert(set.size === 1000)
- assert(set.capacity > 1000)
- for (i <- 1 to 100) {
- set.add(i.toLong)
- }
- assert(set.size === 1000)
- assert(set.capacity > 1000)
- }
-}
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index f679cad713..5927f736f3 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -13,7 +13,7 @@ object in your main program (called the _driver program_).
Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_
(either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across
applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are
-worker processes that run computations and store data for your application.
+worker processes that run computations and store data for your application.
Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to
the executors. Finally, SparkContext sends *tasks* for the executors to run.
@@ -57,6 +57,18 @@ which takes a list of JAR files (Java/Scala) or .egg and .zip libraries (Python)
worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar`
and `addFile`.
+## URIs for addJar / addFile
+
+- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and every executor
+ pulls the file from the driver HTTP server
+- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
+- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This
+ means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
+ or shared via NFS, GlusterFS, etc.
+
+Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
+Over time this can use up a significant amount of space and will need to be cleaned up.
+
# Monitoring
Each driver program has a web UI, typically on port 4040, that displays information about running
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
index 1e5575d657..156a727026 100644
--- a/docs/ec2-scripts.md
+++ b/docs/ec2-scripts.md
@@ -98,7 +98,7 @@ permissions on your private key file, you can run `launch` with the
`bin/hadoop` script in that directory. Note that the data in this
HDFS goes away when you stop and restart a machine.
- There is also a *persistent HDFS* instance in
- `/root/presistent-hdfs` that will keep data across cluster restarts.
+ `/root/persistent-hdfs` that will keep data across cluster restarts.
Typically each node has relatively little space of persistent data
(about 3 GB), but you can use the `--ebs-vol-size` option to
`spark-ec2` to attach a persistent EBS volume to each node for
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 65868b76b9..79848380c0 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -73,7 +73,7 @@ def parse_args():
parser.add_option("-v", "--spark-version", default="0.8.0",
help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
parser.add_option("--spark-git-repo",
- default="https://github.com/mesos/spark",
+ default="https://github.com/apache/incubator-spark",
help="Github repo from which to checkout supplied commit hash")
parser.add_option("--hadoop-major-version", default="1",
help="Major version of Hadoop (default: 1)")
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 646682878f..86dd9ca1b3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -21,6 +21,7 @@ import java.util.Random
import scala.math.exp
import org.apache.spark.util.Vector
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo
/**
@@ -51,7 +52,7 @@ object SparkHdfsLR {
System.exit(1)
}
val inputPath = args(1)
- val conf = SparkEnv.get.hadoop.newConfiguration()
+ val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
InputFormatInfo.computePreferredLocations(
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index 62f445127c..f65f96ed0c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -2,10 +2,9 @@ package org.apache.spark.graph
import com.esotericsoftware.kryo.Kryo
-import org.apache.spark.graph.impl.MessageToPartition
+import org.apache.spark.graph.impl.{EdgePartition, MessageToPartition}
import org.apache.spark.serializer.KryoRegistrator
-import org.apache.spark.graph.impl._
-import org.apache.spark.util.hash.BitSet
+import org.apache.spark.util.collection.BitSet
class GraphKryoRegistrator extends KryoRegistrator {
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index f26e286003..c4761d7452 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -17,26 +17,11 @@
package org.apache.spark.graph
-import java.nio.ByteBuffer
-
-
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark._
-import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
-import org.apache.spark.Partitioner._
-
+import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.hash.BitSet
-import org.apache.spark.util.hash.OpenHashSet
-import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
-
-
-
-
+import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
/**
@@ -184,9 +169,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
(keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next()
- assert(keysIter.hasNext() == false)
+ assert(keysIter.hasNext == false)
val (oldValues, bs) = valuesIter.next()
- assert(valuesIter.hasNext() == false)
+ assert(valuesIter.hasNext == false)
// Allocate the array to store the results into
val newBS = new BitSet(index.capacity)
// Iterate over the active bits in the old bitset and
@@ -246,9 +231,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
(keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next()
- assert(keysIter.hasNext() == false)
+ assert(keysIter.hasNext == false)
val (oldValues, bs: BitSet) = valuesIter.next()
- assert(valuesIter.hasNext() == false)
+ assert(valuesIter.hasNext == false)
// Cosntruct a view of the map transformation
val newValues: (Int => U) = (ind: Int) => {
if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
@@ -384,7 +369,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
// Get the location of the key in the index
val pos = index.getPos(k)
// Only if the key is already in the index
- if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) {
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) == 0) {
// Get the actual index
val ind = pos & OpenHashSet.POSITION_MASK
// If this value has already been seen then merge
@@ -642,7 +627,7 @@ object VertexSetRDD {
*
* @note duplicate vertices are discarded arbitrarily
*
- * @tparam the vertex attribute type
+ * @tparam V the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
@@ -656,7 +641,7 @@ object VertexSetRDD {
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
*
- * @tparam the vertex attribute type
+ * @tparam V the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
@@ -673,7 +658,7 @@ object VertexSetRDD {
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
*
- * @tparam the vertex attribute type
+ * @tparam V the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
@@ -710,13 +695,13 @@ object VertexSetRDD {
val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map
val index = indexIter.next()
- assert(!indexIter.hasNext())
+ assert(!indexIter.hasNext)
val values = new Array[C](index.capacity)
val bs = new BitSet(index.capacity)
for ((k,c) <- tblIter) {
// Get the location of the key in the index
val pos = index.getPos(k)
- if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
throw new SparkException("Error: Trying to bind an external index " +
"to an RDD which contains keys that are not in the index.")
} else {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index b80713dbf4..f817435fb8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -4,26 +4,17 @@ import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.ArrayBuilder
import org.apache.spark.SparkContext._
-import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner
import org.apache.spark.util.ClosureCleaner
-import org.apache.spark.rdd
-import org.apache.spark.rdd.RDD
-
-
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
-
-import org.apache.spark.util.hash.BitSet
-import org.apache.spark.util.hash.OpenHashSet
-import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
-
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
/**
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index 37a4fb4a5e..ee28d1429e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -1,8 +1,6 @@
package org.apache.spark
-import org.apache.spark.util.hash.BitSet
-import org.apache.spark.util.hash.OpenHashSet
-import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.OpenHashSet
package object graph {