diff options
author | Andy Konwinski <andyk@berkeley.edu> | 2012-09-25 15:19:57 -0700 |
---|---|---|
committer | Andy Konwinski <andyk@berkeley.edu> | 2012-09-25 15:19:57 -0700 |
commit | 8d30fe616eacaa3263d46ca20153d1c4e518c16c (patch) | |
tree | 6369b667915663582d4b85c858e516d59a7b2b5b /core | |
parent | 8f7dfcf332b0e56bb0d7e0b97b60a6cb4a8735d1 (diff) | |
parent | aa50d5b9a2b3c5b2adc958c96d88a623fa709e63 (diff) | |
download | spark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.tar.gz spark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.tar.bz2 spark-8d30fe616eacaa3263d46ca20153d1c4e518c16c.zip |
Merge remote-tracking branch 'public-spark/dev' into doc
Diffstat (limited to 'core')
19 files changed, 223 insertions, 124 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 6516bea157..b0daa70cfd 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -9,9 +9,9 @@ package spark * known as map-side aggregations. When set to false, * mergeCombiners function is not used. */ -class Aggregator[K, V, C] ( +case class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, val mergeCombiners: (C, C) => C, val mapSideCombine: Boolean = true) - extends Serializable + diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala index e6ad4dd28e..05ca846c85 100644 --- a/core/src/main/scala/spark/HttpFileServer.scala +++ b/core/src/main/scala/spark/HttpFileServer.scala @@ -20,7 +20,7 @@ class HttpFileServer extends Logging { fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(fileDir) + httpServer = new HttpServer(baseDir) httpServer.start() serverUri = httpServer.uri } @@ -30,11 +30,13 @@ class HttpFileServer extends Logging { } def addFile(file: File) : String = { - return addFileToDir(file, fileDir) + addFileToDir(file, fileDir) + return serverUri + "/files/" + file.getName } def addJar(file: File) : String = { - return addFileToDir(file, jarDir) + addFileToDir(file, jarDir) + return serverUri + "/jars/" + file.getName } def addFileToDir(file: File, dir: File) : String = { diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 64018f8c6b..aa1d00c63c 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -1,11 +1,10 @@ package spark import java.io.EOFException -import java.net.URL import java.io.ObjectInputStream +import java.net.URL +import java.util.{Date, HashMap => JHashMap} import java.util.concurrent.atomic.AtomicLong -import java.util.{HashMap => JHashMap} -import java.util.Date import java.text.SimpleDateFormat import scala.collection.Map @@ -50,9 +49,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - partitioner: Partitioner): RDD[(K, C)] = { - val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - new ShuffledRDD(self, aggregator, partitioner) + partitioner: Partitioner, + mapSideCombine: Boolean = true): RDD[(K, C)] = { + val aggregator = + if (mapSideCombine) { + new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + } else { + // Don't apply map-side combiner. + // A sanity check to make sure mergeCombiners is not defined. + assert(mergeCombiners == null) + new Aggregator[K, V, C](createCombiner, mergeValue, null, false) + } + new ShuffledAggregatedRDD(self, aggregator, partitioner) } def combineByKey[C](createCombiner: V => C, @@ -65,7 +73,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } - + def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { val map = new JHashMap[K, V] @@ -116,13 +124,24 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( groupByKey(new HashPartitioner(numSplits)) } - def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 - val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner) - bufs.flatMapValues(buf => buf) + /** + * Repartition the RDD using the specified partitioner. If mapSideCombine is + * true, Spark will group values of the same key together on the map side + * before the repartitioning. If a large number of duplicated keys are + * expected, and the size of the keys are large, mapSideCombine should be set + * to true. + */ + def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = { + if (mapSideCombine) { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, partitioner) + bufs.flatMapValues(buf => buf) + } else { + new RepartitionShuffledRDD(self, partitioner) + } } def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { @@ -194,17 +213,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( } def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) - + def mapValues[U](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } - + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } - + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), @@ -215,12 +234,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) } } - + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], - other1.asInstanceOf[RDD[(_, _)]], + other1.asInstanceOf[RDD[(_, _)]], other2.asInstanceOf[RDD[(_, _)]]), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) @@ -289,7 +308,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } - + def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } @@ -363,7 +382,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) saveAsHadoopDataset(conf) } - + def saveAsHadoopDataset(conf: JobConf) { val outputFormatClass = conf.getOutputFormat val keyClass = conf.getOutputKeyClass @@ -377,7 +396,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( if (valueClass == null) { throw new SparkException("Output value class not set") } - + logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") val writer = new HadoopWriter(conf) @@ -390,14 +409,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( writer.setup(context.stageId, context.splitId, attemptNumber) writer.open() - + var count = 0 while(iter.hasNext) { val record = iter.next count += 1 writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) } - + writer.close() writer.commit() } @@ -413,28 +432,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) - extends Logging + extends Logging with Serializable { def sortByKey(ascending: Boolean = true): RDD[(K,V)] = { - val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending)) - new SortedRDD(rangePartitionedRDD, ascending) + new ShuffledSortedRDD(self, ascending) } } -class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean) - extends RDD[(K, V)](prev.context) { - - override def splits = prev.splits - override val partitioner = prev.partitioner - override val dependencies = List(new OneToOneDependency(prev)) - - override def compute(split: Split) = { - prev.iterator(split).toArray - .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator - } -} - class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) @@ -444,7 +449,7 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)] class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) extends RDD[(K, U)](prev.context) { - + override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index d28f3593fe..efe248896a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -68,6 +68,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def preferredLocations(split: Split): Seq[String] = Nil def context = sc + + def elementClassManifest: ClassManifest[T] = classManifest[T] // Get a unique ID for this RDD val id = sc.newRddId() diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala index 61a70beaf1..5f26bd2a7b 100644 --- a/core/src/main/scala/spark/Serializer.scala +++ b/core/src/main/scala/spark/Serializer.scala @@ -43,7 +43,7 @@ trait SerializerInstance { def deserializeMany(buffer: ByteBuffer): Iterator[Any] = { // Default implementation uses deserializeStream buffer.rewind() - deserializeStream(new ByteBufferInputStream(buffer)).toIterator + deserializeStream(new ByteBufferInputStream(buffer)).asIterator } } @@ -74,7 +74,7 @@ trait DeserializationStream { * Read the elements of this stream through an iterator. This can only be called once, as * reading each element will consume data from the input source. */ - def toIterator: Iterator[Any] = new Iterator[Any] { + def asIterator: Iterator[Any] = new Iterator[Any] { var gotNext = false var finished = false var nextValue: Any = null diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 3616d8e47e..a7346060b3 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -1,29 +1,89 @@ package spark +import scala.collection.mutable.ArrayBuffer import java.util.{HashMap => JHashMap} + class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx override def hashCode(): Int = idx } -class ShuffledRDD[K, V, C]( + +/** + * The resulting RDD from a shuffle (e.g. repartitioning of data). + */ +abstract class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], aggregator: Aggregator[K, V, C], - part : Partitioner) + part : Partitioner) extends RDD[(K, C)](parent.context) { - //override val partitioner = Some(part) + override val partitioner = Some(part) - + @transient val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) override def splits = splits_ - + override def preferredLocations(split: Split) = Nil - + val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) override val dependencies = List(dep) +} + + +/** + * Repartition a key-value pair RDD. + */ +class RepartitionShuffledRDD[K, V]( + @transient parent: RDD[(K, V)], + part : Partitioner) + extends ShuffledRDD[K, V, V]( + parent, + Aggregator[K, V, V](null, null, null, false), + part) { + + override def compute(split: Split): Iterator[(K, V)] = { + val buf = new ArrayBuffer[(K, V)] + val fetcher = SparkEnv.get.shuffleFetcher + def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) } + fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) + buf.iterator + } +} + + +/** + * A sort-based shuffle (that doesn't apply aggregation). It does so by first + * repartitioning the RDD by range, and then sort within each range. + */ +class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V]( + @transient parent: RDD[(K, V)], + ascending: Boolean) + extends RepartitionShuffledRDD[K, V]( + parent, + new RangePartitioner(parent.splits.size, parent, ascending)) { + + override def compute(split: Split): Iterator[(K, V)] = { + // By separating this from RepartitionShuffledRDD, we avoided a + // buf.iterator.toArray call, thus avoiding building up the buffer twice. + val buf = new ArrayBuffer[(K, V)] + def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) } + SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) + buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator + } +} + + +/** + * The resulting RDD from shuffle and running (hash-based) aggregation. + */ +class ShuffledAggregatedRDD[K, V, C]( + @transient parent: RDD[(K, V)], + aggregator: Aggregator[K, V, C], + part : Partitioner) + extends ShuffledRDD[K, V, C](parent, aggregator, part) { override def compute(split: Split): Iterator[(K, C)] = { val combiners = new JHashMap[K, C] diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 7473b40aa3..6ffae8e85f 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -40,6 +40,8 @@ class SparkEnv ( blockManager.stop() blockManager.master.stop() actorSystem.shutdown() + // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit + Thread.sleep(100) actorSystem.awaitTermination() // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit Thread.sleep(100) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 07aa18e540..5a3f8bde43 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -163,6 +163,8 @@ object Utils extends Logging { logInfo("Untarring " + filename) Utils.execute(Seq("tar", "-xf", filename), targetDir) } + // Make the file executable - That's necessary for scripts + FileUtil.chmod(filename, "a+x") } /** diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index 31d48b82b9..4c81a1b447 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va } def coresLeft: Int = desc.cores - coresGranted + + private var _retryCount = 0 + + def retryCount = _retryCount + + def incrementRetryCount = { + _retryCount += 1 + _retryCount + } } diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala index 50b0c6f95b..8d458ac39c 100644 --- a/core/src/main/scala/spark/deploy/master/JobState.scala +++ b/core/src/main/scala/spark/deploy/master/JobState.scala @@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") type JobState = Value val WAITING, RUNNING, FINISHED, FAILED = Value + + val MAX_NUM_RETRY = 10 } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index c98dddea7b..5cc73633ab 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -1,19 +1,18 @@ package spark.deploy.master -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} - import akka.actor._ -import spark.{Logging, Utils} -import spark.util.AkkaUtils +import akka.actor.Terminated +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} + import java.text.SimpleDateFormat import java.util.Date -import akka.remote.RemoteClientLifeCycleEvent + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + import spark.deploy._ -import akka.remote.RemoteClientShutdown -import akka.remote.RemoteClientDisconnected -import spark.deploy.RegisterWorker -import spark.deploy.RegisterWorkerFailed -import akka.actor.Terminated +import spark.{Logging, SparkException, Utils} +import spark.util.AkkaUtils + class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs @@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { exec.state = state exec.job.actor ! ExecutorUpdated(execId, state, message) if (ExecutorState.isFinished(state)) { + val jobInfo = idToJob(jobId) // Remove this executor from the worker and job logInfo("Removing executor " + exec.fullId + " because it is " + state) - idToJob(jobId).removeExecutor(exec) + jobInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) - // TODO: the worker would probably want to restart the executor a few times - schedule() + + // Only retry certain number of times so we don't go into an infinite loop. + if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) { + schedule() + } else { + val e = new SparkException("Job %s wth ID %s failed %d times.".format( + jobInfo.desc.name, jobInfo.id, jobInfo.retryCount)) + logError(e.getMessage, e) + throw e + //System.exit(1) + } } } case None => @@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { addressToWorker.get(address).foreach(removeWorker) addressToJob.get(address).foreach(removeJob) } - + case RequestMasterState => { sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList) } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 7043361020..e2a9df275a 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -75,7 +75,8 @@ class ExecutorRunner( def buildCommandSeq(): Seq[String] = { val command = jobDesc.command - val runScript = new File(sparkHome, "run").getCanonicalPath + val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run"; + val runScript = new File(sparkHome, script).getCanonicalPath Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables) } diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 8f975c52d4..9999b6ba80 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -38,6 +38,10 @@ class Executor extends Logging { System.setProperty(key, value) } + // Create our ClassLoader and set it on this thread + urlClassLoader = createClassLoader() + Thread.currentThread.setContextClassLoader(urlClassLoader) + // Initialize Spark environment (using system properties read above) env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false) SparkEnv.set(env) @@ -45,11 +49,6 @@ class Executor extends Logging { // Start worker thread pool threadPool = new ThreadPoolExecutor( 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - - // Create our ClassLoader and set it on this thread - urlClassLoader = createClassLoader() - Thread.currentThread.setContextClassLoader(urlClassLoader) - } def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 3bcc588015..745aa0c939 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -44,7 +44,8 @@ object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = { + def serializeFileSet( + set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = { val old = cache.get(stageId) if (old != null) { return old @@ -59,7 +60,6 @@ object ShuffleMapTask { } } - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = { synchronized { val loader = Thread.currentThread.getContextClassLoader @@ -113,7 +113,8 @@ class ShuffleMapTask( out.writeInt(bytes.length) out.write(bytes) - val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache) + val fileSetBytes = ShuffleMapTask.serializeFileSet( + fileSet, stageId, ShuffleMapTask.fileSetCache) out.writeInt(fileSetBytes.length) out.write(fileSetBytes) val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache) @@ -172,7 +173,7 @@ class ShuffleMapTask( buckets.map(_.iterator) } else { // No combiners (no map-side aggregation). Simply partition the map output. - val buckets = Array.tabulate(numOutputSplits)(_ => new ArrayBuffer[(Any, Any)]) + val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) for (elem <- rdd.iterator(split)) { val pair = elem.asInstanceOf[(Any, Any)] val bucketId = partitioner.getPartition(pair._1) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 3a51f6bd96..15748b70d5 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -614,10 +614,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = { - /*serializer.newInstance().deserializeMany(bytes)*/ - val ser = serializer.newInstance() bytes.rewind() - return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator + val ser = serializer.newInstance() + return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator } def stop() { @@ -632,7 +631,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m object BlockManager { def getNumParallelFetchesFromSystemProperties(): Int = { - System.getProperty("spark.blockManager.parallelFetches", "8").toInt + System.getProperty("spark.blockManager.parallelFetches", "4").toInt } def getMaxMemoryFromSystemProperties(): Long = { diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 09287faba0..d505df66a7 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,6 +1,6 @@ package spark.storage -import java.io.{File, RandomAccessFile} +import java.io.{File, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode import java.util.{LinkedHashMap, UUID} @@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} import scala.collection.mutable.ArrayBuffer +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + import spark.{Utils, Logging, Serializer, SizeEstimator} /** * Abstract class to store blocks */ -abstract class BlockStore(blockManager: BlockManager) extends Logging { +abstract class BlockStore(val blockManager: BlockManager) extends Logging { initLogging() def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) @@ -131,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) return None } if (entry.deserialized) { - return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator) + return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) } else { return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate())) } @@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis val file = createFile(blockId) - if (file != null) { - val channel = new RandomAccessFile(file, "rw").getChannel() - val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) - buffer.put(bytes) - channel.close() - val finishTime = System.currentTimeMillis - logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( - blockId, bytes.limit, (finishTime - startTime))) - } else { - logError("File not created for block " + blockId) - } + val channel = new RandomAccessFile(file, "rw").getChannel() + val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) + buffer.put(bytes) + channel.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( + blockId, bytes.limit, (finishTime - startTime))) } def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) - : Either[Iterator[Any], ByteBuffer] = { - val bytes = dataSerialize(values) - logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes") - putBytes(blockId, bytes, level) - return Right(bytes) + : Either[Iterator[Any], ByteBuffer] = { + + logDebug("Attempting to write values for block " + blockId) + val file = createFile(blockId) + val fileOut = new FastBufferedOutputStream(new FileOutputStream(file)) + val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) + objOut.writeAll(values) + objOut.close() + + // Return a byte buffer for the contents of the file + val channel = new RandomAccessFile(file, "rw").getChannel() + Right(channel.map(MapMode.READ_WRITE, 0, channel.size())) } def getBytes(blockId: String): Option[ByteBuffer] = { @@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) newFile.getParentFile.mkdirs() return newFile } else { - logError("File for block " + blockId + " already exists on disk, " + file) - return null + throw new Exception("File for block " + blockId + " already exists on disk, " + file) } } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index b7b8a79327..93b876d205 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -18,7 +18,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val clusterUrl = "local-cluster[2,1,512]" - var sc: SparkContext = _ + @transient var sc: SparkContext = _ after { if (sc != null) { diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index 500af1eb90..fd7a7bd589 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -3,14 +3,14 @@ package spark import com.google.common.io.Files import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter -import java.io.{File, PrintWriter} +import java.io.{File, PrintWriter, FileReader, BufferedReader} import SparkContext._ class FileServerSuite extends FunSuite with BeforeAndAfter { - var sc: SparkContext = _ - var tmpFile : File = _ - var testJarFile : File = _ + @transient var sc: SparkContext = _ + @transient var tmpFile : File = _ + @transient var testJarFile : File = _ before { // Create a sample text file @@ -38,7 +38,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { - val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile)) + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) val fileVal = in.readLine().toInt in.close() _ * fileVal + _ * fileVal @@ -53,7 +53,9 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int]) + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt a + b @@ -66,7 +68,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { - val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile)) + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) val fileVal = in.readLine().toInt in.close() _ * fileVal + _ * fileVal @@ -75,19 +77,19 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { assert(result.toSet === Set((1,200), (2,300), (3,500))) } - test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int]) + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt a + b }.collect() assert(result.toSet === Set((1,2), (2,7), (3,121))) } - -}
\ No newline at end of file +} diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index f622c413f7..9d7e2591f1 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { - + var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() sc = null } } - + test("groupByKey") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { val valuesFor2 = groups.find(_._1 == 2).get._2 assert(valuesFor2.toList.sorted === List(1)) } - + test("groupByKey with many output partitions") { sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) @@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { (4, (ArrayBuffer(), ArrayBuffer('w'))) )) } - + test("zero-partition RDD") { sc = new SparkContext("local", "test") val emptyDir = Files.createTempDir() @@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { assert(file.splits.size == 0) assert(file.collect().toList === Nil) // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) } test("map-side combine") { @@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { _+_, _+_, false) - val shuffledRdd = new ShuffledRDD( + val shuffledRdd = new ShuffledAggregatedRDD( pairs, aggregator, new HashPartitioner(2)) assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1))) @@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // not see an exception because mergeCombine should not have been called. val aggregatorWithException = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false) - val shuffledRdd1 = new ShuffledRDD( + val shuffledRdd1 = new ShuffledAggregatedRDD( pairs, aggregatorWithException, new HashPartitioner(2)) assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1))) @@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // expect to see an exception thrown. val aggregatorWithException1 = new Aggregator[Int, Int, Int]( (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) - val shuffledRdd2 = new ShuffledRDD( + val shuffledRdd2 = new ShuffledAggregatedRDD( pairs, aggregatorWithException1, new HashPartitioner(2)) evaluating { shuffledRdd2.collect() } should produce [SparkException] } |