From 2d761e3353651049f6707c74bb5ffdd6e86f6f35 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 12 Sep 2012 14:54:40 -0700 Subject: Ported performance and FT improvements from latest streaming work --- core/src/main/scala/spark/BlockRDD.scala | 3 +- .../src/main/scala/spark/DaemonThreadFactory.scala | 12 ++- core/src/main/scala/spark/HadoopRDD.scala | 5 +- core/src/main/scala/spark/KryoSerializer.scala | 6 ++ core/src/main/scala/spark/MapOutputTracker.scala | 88 ++++++++++++++++++++-- core/src/main/scala/spark/NewHadoopRDD.scala | 10 ++- core/src/main/scala/spark/RDD.scala | 2 +- .../spark/network/ConnectionManagerTest.scala | 5 +- .../scala/spark/scheduler/ShuffleMapTask.scala | 9 +-- .../spark/scheduler/cluster/ClusterScheduler.scala | 1 + .../spark/scheduler/cluster/TaskSetManager.scala | 5 ++ .../mesos/CoarseMesosSchedulerBackend.scala | 4 +- .../main/scala/spark/storage/BlockManager.scala | 15 +++- .../scala/spark/storage/BlockManagerWorker.scala | 2 +- .../main/scala/spark/storage/BlockMessage.scala | 14 +--- core/src/main/scala/spark/storage/BlockStore.scala | 30 ++++---- .../main/scala/spark/storage/StorageLevel.scala | 1 + core/src/main/scala/spark/util/AkkaUtils.scala | 2 + 18 files changed, 153 insertions(+), 61 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala index ea009f0f4f..daabc0d566 100644 --- a/core/src/main/scala/spark/BlockRDD.scala +++ b/core/src/main/scala/spark/BlockRDD.scala @@ -7,7 +7,8 @@ class BlockRDDSplit(val blockId: String, idx: Int) extends Split { } -class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) { +class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String]) + extends RDD[T](sc) { @transient val splits_ = (0 until blockIds.size).map(i => { diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala index 003880c5e8..56e59adeb7 100644 --- a/core/src/main/scala/spark/DaemonThreadFactory.scala +++ b/core/src/main/scala/spark/DaemonThreadFactory.scala @@ -6,9 +6,13 @@ import java.util.concurrent.ThreadFactory * A ThreadFactory that creates daemon threads */ private object DaemonThreadFactory extends ThreadFactory { - override def newThread(r: Runnable): Thread = { - val t = new Thread(r) - t.setDaemon(true) - return t + override def newThread(r: Runnable): Thread = new DaemonThread(r) +} + +private class DaemonThread(r: Runnable = null) extends Thread { + override def run() { + if (r != null) { + r.run() + } } } \ No newline at end of file diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index f282a4023b..0befca582d 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -42,7 +42,8 @@ class HadoopRDD[K, V]( minSplits: Int) extends RDD[(K, V)](sc) { - val serializableConf = new SerializableWritable(conf) + // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it + val confBroadcast = sc.broadcast(new SerializableWritable(conf)) @transient val splits_ : Array[Split] = { @@ -66,7 +67,7 @@ class HadoopRDD[K, V]( val split = theSplit.asInstanceOf[HadoopSplit] var reader: RecordReader[K, V] = null - val conf = serializableConf.value + val conf = confBroadcast.value.value val fmt = createInputFormat(conf) reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 65d0532bd5..8a3f565071 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -10,8 +10,10 @@ import scala.collection.mutable import com.esotericsoftware.kryo._ import com.esotericsoftware.kryo.{Serializer => KSerializer} import com.esotericsoftware.kryo.serialize.ClassSerializer +import com.esotericsoftware.kryo.serialize.SerializableSerializer import de.javakaffee.kryoserializers.KryoReflectionFactorySupport +import spark.broadcast._ import spark.storage._ /** @@ -203,6 +205,10 @@ class KryoSerializer extends Serializer with Logging { kryo.register(classOf[Class[_]], new ClassSerializer(kryo)) kryo.setRegistrationOptional(true) + // Allow sending SerializableWritable + kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer()) + kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer()) + // Register some commonly used Scala singleton objects. Because these // are singletons, we must return the exact same local object when we // deserialize rather than returning a clone as FieldSerializer would. diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index de23eb6f48..82c1391345 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -1,5 +1,6 @@ package spark +import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream} import java.util.concurrent.ConcurrentHashMap import akka.actor._ @@ -10,6 +11,7 @@ import akka.util.Duration import akka.util.Timeout import akka.util.duration._ +import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import spark.storage.BlockManagerId @@ -18,12 +20,11 @@ sealed trait MapOutputTrackerMessage case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage case object StopMapOutputTracker extends MapOutputTrackerMessage -class MapOutputTrackerActor(bmAddresses: ConcurrentHashMap[Int, Array[BlockManagerId]]) -extends Actor with Logging { +class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging { def receive = { case GetMapOutputLocations(shuffleId: Int) => logInfo("Asked to get map output locations for shuffle " + shuffleId) - sender ! bmAddresses.get(shuffleId) + sender ! tracker.getSerializedLocations(shuffleId) case StopMapOutputTracker => logInfo("MapOutputTrackerActor stopped!") @@ -39,15 +40,19 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg val timeout = 10.seconds - private var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]] + var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]] // Incremented every time a fetch fails so that client nodes know to clear // their cache of map output locations if this happens. private var generation: Long = 0 private var generationLock = new java.lang.Object + // Cache a serialized version of the output locations for each shuffle to send them out faster + var cacheGeneration = generation + val cachedSerializedLocs = new HashMap[Int, Array[Byte]] + var trackerActor: ActorRef = if (isMaster) { - val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(bmAddresses)), name = actorName) + val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName) logInfo("Registered MapOutputTrackerActor actor") actor } else { @@ -134,15 +139,16 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg } // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) - val fetched = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[BlockManagerId]] + val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]] + val fetchedLocs = deserializeLocations(fetchedBytes) logInfo("Got the output locations") - bmAddresses.put(shuffleId, fetched) + bmAddresses.put(shuffleId, fetchedLocs) fetching.synchronized { fetching -= shuffleId fetching.notifyAll() } - return fetched + return fetchedLocs } else { return locs } @@ -181,4 +187,70 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg } } } + + def getSerializedLocations(shuffleId: Int): Array[Byte] = { + var locs: Array[BlockManagerId] = null + var generationGotten: Long = -1 + generationLock.synchronized { + if (generation > cacheGeneration) { + cachedSerializedLocs.clear() + cacheGeneration = generation + } + cachedSerializedLocs.get(shuffleId) match { + case Some(bytes) => + return bytes + case None => + locs = bmAddresses.get(shuffleId) + generationGotten = generation + } + } + // If we got here, we failed to find the serialized locations in the cache, so we pulled + // out a snapshot of the locations as "locs"; let's serialize and return that + val bytes = serializeLocations(locs) + // Add them into the table only if the generation hasn't changed while we were working + generationLock.synchronized { + if (generation == generationGotten) { + cachedSerializedLocs(shuffleId) = bytes + } + } + return bytes + } + + // Serialize an array of map output locations into an efficient byte format so that we can send + // it to reduce tasks. We do this by grouping together the locations by block manager ID. + def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = { + val out = new ByteArrayOutputStream + val dataOut = new DataOutputStream(out) + dataOut.writeInt(locs.length) + val grouped = locs.zipWithIndex.groupBy(_._1) + dataOut.writeInt(grouped.size) + for ((id, pairs) <- grouped if id != null) { + dataOut.writeUTF(id.ip) + dataOut.writeInt(id.port) + dataOut.writeInt(pairs.length) + for ((_, blockIndex) <- pairs) { + dataOut.writeInt(blockIndex) + } + } + dataOut.close() + out.toByteArray + } + + // Opposite of serializeLocations. + def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = { + val dataIn = new DataInputStream(new ByteArrayInputStream(bytes)) + val length = dataIn.readInt() + val array = new Array[BlockManagerId](length) + val numGroups = dataIn.readInt() + for (i <- 0 until numGroups) { + val ip = dataIn.readUTF() + val port = dataIn.readInt() + val id = new BlockManagerId(ip, port) + val numBlocks = dataIn.readInt() + for (j <- 0 until numBlocks) { + array(dataIn.readInt()) = id + } + } + array + } } diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala index d024d38aa9..14f708a3f8 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/NewHadoopRDD.scala @@ -28,7 +28,9 @@ class NewHadoopRDD[K, V]( @transient conf: Configuration) extends RDD[(K, V)](sc) { - private val serializableConf = new SerializableWritable(conf) + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + // private val serializableConf = new SerializableWritable(conf) private val jobtrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -41,7 +43,7 @@ class NewHadoopRDD[K, V]( @transient private val splits_ : Array[Split] = { val inputFormat = inputFormatClass.newInstance - val jobContext = new JobContext(serializableConf.value, jobId) + val jobContext = new JobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Split](rawSplits.size) for (i <- 0 until rawSplits.size) { @@ -54,9 +56,9 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Split) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopSplit] - val conf = serializableConf.value + val conf = confBroadcast.value.value val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) - val context = new TaskAttemptContext(serializableConf.value, attemptId) + val context = new TaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance val reader = format.createRecordReader(split.serializableHadoopSplit.value, context) reader.initialize(split.serializableHadoopSplit.value, context) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 3fe8e8a4bf..d28f3593fe 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def getStorageLevel = storageLevel - def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = { + def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = { if (!level.useDisk && level.replication < 2) { throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")") } diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala index 5d21bb793f..555b3454ee 100644 --- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala @@ -8,6 +8,9 @@ import scala.io.Source import java.nio.ByteBuffer import java.net.InetAddress +import akka.dispatch.Await +import akka.util.duration._ + object ConnectionManagerTest extends Logging{ def main(args: Array[String]) { if (args.length < 2) { @@ -53,7 +56,7 @@ object ConnectionManagerTest extends Logging{ logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]") connManager.sendMessageReliably(slaveConnManagerId, bufferMessage) }) - val results = futures.map(f => f()) + val results = futures.map(f => Await.result(f, 1.second)) val finishTime = System.currentTimeMillis Thread.sleep(5000) diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index b9f0a0d6d0..99984fb557 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -31,7 +31,8 @@ object ShuffleMapTask { return old } else { val out = new ByteArrayOutputStream - val objOut = new ObjectOutputStream(new GZIPOutputStream(out)) + val ser = SparkEnv.get.closureSerializer.newInstance + val objOut = ser.serializeStream(new GZIPOutputStream(out)) objOut.writeObject(rdd) objOut.writeObject(dep) objOut.close() @@ -63,10 +64,8 @@ object ShuffleMapTask { synchronized { val loader = Thread.currentThread.getContextClassLoader val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) - val objIn = new ObjectInputStream(in) { - override def resolveClass(desc: ObjectStreamClass) = - Class.forName(desc.getName, false, loader) - } + val ser = SparkEnv.get.closureSerializer.newInstance + val objIn = ser.deserializeStream(in) val rdd = objIn.readObject().asInstanceOf[RDD[_]] val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]] return (rdd, dep) diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 750231ac31..952c9766bf 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -118,6 +118,7 @@ class ClusterScheduler(sc: SparkContext) */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = { synchronized { + SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname for (o <- offers) { slaveIdToHost(o.slaveId) = o.hostname diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 5f98a396b4..e25a11e7c5 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -243,6 +243,11 @@ class TaskSetManager( def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) { val info = taskInfos(tid) + if (info.failed) { + // We might get two task-lost messages for the same task in coarse-grained Mesos mode, + // or even from Mesos itself when acks get delayed. + return + } val index = info.index info.markSuccessful() if (!finished(index)) { diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 31784985dc..fdf007ffb2 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -80,6 +80,8 @@ class CoarseMesosSchedulerBackend( "property, the SPARK_HOME environment variable or the SparkContext constructor") } + val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt + var nextMesosTaskId = 0 def newMesosTaskId(): Int = { @@ -177,7 +179,7 @@ class CoarseMesosSchedulerBackend( val task = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse)) + .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) .setName("Task " + taskId) .addResources(createResource("cpus", cpusToUse)) .addResources(createResource("mem", executorMemory)) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index cb7b0c8bc1..3a51f6bd96 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -484,8 +484,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Initiate the replication before storing it locally. This is faster as // data is already serialized and ready for sending val replicationFuture = if (level.replication > 1) { + val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper Future { - replicate(blockId, bytes, level) + replicate(blockId, bufferView, level) } } else { null @@ -537,21 +538,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Replicate block to another node. */ + var firstTime = true + var peers : Seq[BlockManagerId] = null private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) { val tLevel: StorageLevel = new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) - var peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1)) + if (firstTime) { + peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1)) + firstTime = false; + } for (peer: BlockManagerId <- peers) { val start = System.nanoTime + data.rewind() logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is " - + data.array().length + " Bytes. To node: " + peer) + + data.limit() + " Bytes. To node: " + peer) if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), new ConnectionManagerId(peer.ip, peer.port))) { logError("Failed to call syncPutBlock to " + peer) } logDebug("Replicated BlockId " + blockId + " once used " + (System.nanoTime - start) / 1e6 + " s; The size of the data is " + - data.array().length + " bytes.") + data.limit() + " bytes.") } } diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index 0eaa558f44..0ad1ad056c 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -71,7 +71,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging { logDebug("PutBlock " + id + " started from " + startTimeMs + " with data: " + bytes) blockManager.putBytes(id, bytes, level) logDebug("PutBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs) - + " with data size: " + bytes.array().length) + + " with data size: " + bytes.limit) } private def getBlock(id: String): ByteBuffer = { diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index b9833273e5..5e2ccb199a 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -12,7 +12,7 @@ case class GetBlock(id: String) case class GotBlock(id: String, data: ByteBuffer) case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel) -class BlockMessage() extends Logging{ +class BlockMessage() { // Un-initialized: typ = 0 // GetBlock: typ = 1 // GotBlock: typ = 2 @@ -22,8 +22,6 @@ class BlockMessage() extends Logging{ private var data: ByteBuffer = null private var level: StorageLevel = null - initLogging() - def set(getBlock: GetBlock) { typ = BlockMessage.TYPE_GET_BLOCK id = getBlock.id @@ -62,8 +60,6 @@ class BlockMessage() extends Logging{ } id = idBuilder.toString() - logDebug("Set from buffer Result: " + typ + " " + id) - logDebug("Buffer position is " + buffer.position) if (typ == BlockMessage.TYPE_PUT_BLOCK) { val booleanInt = buffer.getInt() @@ -77,23 +73,18 @@ class BlockMessage() extends Logging{ } data.put(buffer) data.flip() - logDebug("Set from buffer Result 2: " + level + " " + data) } else if (typ == BlockMessage.TYPE_GOT_BLOCK) { val dataLength = buffer.getInt() - logDebug("Data length is "+ dataLength) - logDebug("Buffer position is " + buffer.position) data = ByteBuffer.allocate(dataLength) if (dataLength != buffer.remaining) { throw new Exception("Error parsing buffer") } data.put(buffer) data.flip() - logDebug("Set from buffer Result 3: " + data) } val finishTime = System.currentTimeMillis - logDebug("Converted " + id + " from bytebuffer in " + (finishTime - startTime) / 1000.0 + " s") } def set(bufferMsg: BufferMessage) { @@ -145,8 +136,6 @@ class BlockMessage() extends Logging{ buffers += data } - logDebug("Start to log buffers.") - buffers.foreach((x: ByteBuffer) => logDebug("" + x)) /* println() println("BlockMessage: ") @@ -160,7 +149,6 @@ class BlockMessage() extends Logging{ println() */ val finishTime = System.currentTimeMillis - logDebug("Converted " + id + " to buffer message in " + (finishTime - startTime) / 1000.0 + " s") return Message.createBufferMessage(buffers) } diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index f66b5bc897..09287faba0 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -87,12 +87,12 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( blockId, sizeEstimate, freeMemory)) } else { - val entry = new Entry(bytes, bytes.array().length, false) - ensureFreeSpace(bytes.array.length) + val entry = new Entry(bytes, bytes.limit, false) + ensureFreeSpace(bytes.limit) memoryStore.synchronized { memoryStore.put(blockId, entry) } - currentMemory += bytes.array().length + currentMemory += bytes.limit logInfo("Block %s stored as %d bytes to memory (free %d)".format( - blockId, bytes.array().length, freeMemory)) + blockId, bytes.limit, freeMemory)) } } @@ -111,12 +111,12 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) return Left(elements.iterator) } else { val bytes = dataSerialize(values) - ensureFreeSpace(bytes.array().length) - val entry = new Entry(bytes, bytes.array().length, false) + ensureFreeSpace(bytes.limit) + val entry = new Entry(bytes, bytes.limit, false) memoryStore.synchronized { memoryStore.put(blockId, entry) } - currentMemory += bytes.array().length + currentMemory += bytes.limit logInfo("Block %s stored as %d bytes to memory (free %d)".format( - blockId, bytes.array.length, freeMemory)) + blockId, bytes.limit, freeMemory)) return Right(bytes) } } @@ -133,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry.deserialized) { return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator) } else { - return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer])) + return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate())) } } @@ -219,12 +219,12 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) val file = createFile(blockId) if (file != null) { val channel = new RandomAccessFile(file, "rw").getChannel() - val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.array.length) - buffer.put(bytes.array) + val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit) + buffer.put(bytes) channel.close() val finishTime = System.currentTimeMillis logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( - blockId, bytes.array.length, (finishTime - startTime))) + blockId, bytes.limit, (finishTime - startTime))) } else { logError("File not created for block " + blockId) } @@ -233,7 +233,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) : Either[Iterator[Any], ByteBuffer] = { val bytes = dataSerialize(values) - logDebug("Converted block " + blockId + " to " + bytes.array.length + " bytes") + logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes") putBytes(blockId, bytes, level) return Right(bytes) } @@ -242,9 +242,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) val file = getFile(blockId) val length = file.length().toInt val channel = new RandomAccessFile(file, "r").getChannel() - val bytes = ByteBuffer.allocate(length) - bytes.put(channel.map(MapMode.READ_WRITE, 0, length)) - return Some(bytes) + Some(channel.map(MapMode.READ_WRITE, 0, length)) } def getValues(blockId: String): Option[Iterator[Any]] = { diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index 1d38ca13cc..b168c8e869 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -66,6 +66,7 @@ class StorageLevel( object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) + val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, 2) val MEMORY_ONLY_DESER = new StorageLevel(false, true, true) diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index 57d212e4ca..df4e23bfd6 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -31,6 +31,8 @@ object AkkaUtils { akka.remote.netty.hostname = "%s" akka.remote.netty.port = %d akka.remote.netty.connection-timeout = 1s + akka.remote.netty.execution-pool-size = 8 + akka.actor.default-dispatcher.throughput = 30 """.format(host, port)) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) -- cgit v1.2.3