diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-10-13 11:15:02 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-10-13 11:15:02 -0700 |
commit | da896115ec8861f78922fa9502ef257b1e54e3c2 (patch) | |
tree | f1b48c15709b5e6f322f699f9f04981fb3caaf5b | |
parent | d60352283cdc34722749024f6fd47e9309f01e37 (diff) | |
download | spark-da896115ec8861f78922fa9502ef257b1e54e3c2.tar.gz spark-da896115ec8861f78922fa9502ef257b1e54e3c2.tar.bz2 spark-da896115ec8861f78922fa9502ef257b1e54e3c2.zip |
Change BlockId filename to name + rest of Patrick's comments
11 files changed, 39 insertions, 36 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index 299162f12c..097b2ad6d5 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -37,7 +37,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { @Override public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { BlockId blockId = BlockId.apply(blockIdString); - String path = pResolver.getAbsolutePath(blockId.filename()); + String path = pResolver.getAbsolutePath(blockId.asFilename()); // if getFilePath returns null, close the channel if (path == null) { //ctx.close(); diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 5948fcef88..919f9765de 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -120,7 +120,7 @@ private object HttpBroadcast extends Logging { } def write(id: Long, value: Any) { - val file = new File(broadcastDir, BroadcastBlockId(id).filename) + val file = new File(broadcastDir, BroadcastBlockId(id).asFilename) val out: OutputStream = { if (compress) { compressionCodec.compressedOutputStream(new FileOutputStream(file)) @@ -136,7 +136,7 @@ private object HttpBroadcast extends Logging { } def read[T](id: Long): T = { - val url = serverUri + "/" + BroadcastBlockId(id).filename + val url = serverUri + "/" + BroadcastBlockId(id).asFilename val in = { if (compress) { compressionCodec.compressedInputStream(new URL(url).openStream()) diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala index 9ade373823..1b9fa1e53a 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala @@ -30,8 +30,8 @@ private[spark] class FileHeader ( val buf = Unpooled.buffer() buf.capacity(FileHeader.HEADER_SIZE) buf.writeInt(fileLen) - buf.writeInt(blockId.filename.length) - blockId.filename.foreach((x: Char) => buf.writeByte(x)) + buf.writeInt(blockId.name.length) + blockId.name.foreach((x: Char) => buf.writeByte(x)) //padding the rest of header if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) { buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes) diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index bb1c8ac20e..eac2177b54 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -42,7 +42,7 @@ private[spark] class ShuffleCopier extends Logging { try { fc.init() fc.connect(host, port) - fc.sendRequest(blockId.filename) + fc.sendRequest(blockId.asFilename) fc.waitForClose() fc.close() } catch { diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 611a44e5b9..b88fbaa4d1 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -64,7 +64,7 @@ private[spark] object ShuffleSender { val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) - val file = new File(subDir, blockId.filename) + val file = new File(subDir, blockId.asFilename) return file.getAbsolutePath } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 0fc212d8f9..b477a82e33 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -25,8 +25,11 @@ package org.apache.spark.storage * If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method. */ private[spark] abstract class BlockId { - // Physical filename and unique identifier of this Block. - def filename: String + /** A globally unique identifier for this Block. Can be used for ser/de. */ + def name: String + + /** Physical filename for this block. May not be valid for Blocks are not file-backed. */ + def asFilename = name // convenience methods def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None @@ -34,41 +37,40 @@ private[spark] abstract class BlockId { def isShuffle = isInstanceOf[ShuffleBlockId] def isBroadcast = isInstanceOf[BroadcastBlockId] - override def toString = filename - override def hashCode = filename.hashCode + override def toString = name + override def hashCode = name.hashCode override def equals(other: Any): Boolean = other match { - case o: BlockId => filename.equals(o.filename) + case o: BlockId => getClass == o.getClass && name.equals(o.name) case _ => false } } private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { - def filename = "rdd_" + rddId + "_" + splitIndex + def name = "rdd_" + rddId + "_" + splitIndex } private[spark] case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { - def filename = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId { - def filename = "broadcast_" + broadcastId + def name = "broadcast_" + broadcastId } private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId { - def filename = "taskresult_" + taskId + def name = "taskresult_" + taskId } private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { - def filename = "input-" + streamId + "-" + uniqueId + def name = "input-" + streamId + "-" + uniqueId } // Intended only for testing purposes private[spark] case class TestBlockId(id: String) extends BlockId { - def filename = "test_" + id + def name = "test_" + id } -// Contains deserialization logic (i.e., String -> BlockId). private[spark] object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r val Shuffle = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r @@ -77,6 +79,7 @@ private[spark] object BlockId { val StreamInput = "input-([0-9]+)-([0-9]+)".r val Test = "test_(.*)".r + /** Converts a BlockId "name" String back into a BlockId. */ def apply(id: String) = id match { case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) case Shuffle(shuffleId, mapId, reduceId) => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index cc0c46ec16..45f51da288 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -62,7 +62,7 @@ private[storage] object BlockManagerMessages { override def writeExternal(out: ObjectOutput) { blockManagerId.writeExternal(out) - out.writeUTF(blockId.filename) + out.writeUTF(blockId.name) storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala index 7e8ee2486e..80dcb5a207 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala @@ -117,9 +117,9 @@ private[spark] class BlockMessage() { def toBufferMessage: BufferMessage = { val startTime = System.currentTimeMillis val buffers = new ArrayBuffer[ByteBuffer]() - var buffer = ByteBuffer.allocate(4 + 4 + id.filename.length * 2) - buffer.putInt(typ).putInt(id.filename.length) - id.filename.foreach((x: Char) => buffer.putChar(x)) + var buffer = ByteBuffer.allocate(4 + 4 + id.name.length * 2) + buffer.putInt(typ).putInt(id.name.length) + id.name.foreach((x: Char) => buffer.putChar(x)) buffer.flip() buffers += buffer diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 4200935d93..c0b0076a67 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -258,7 +258,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } } - new File(subDir, blockId.filename) + new File(subDir, blockId.asFilename) } private def createLocalDirs(): Array[File] = { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 74e5deb35d..b83cd54f3c 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -46,7 +46,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) { "Executors") val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray. - sortWith(_._1.filename < _._1.filename) + sortWith(_._1.name < _._1.name) val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) val blocks = blockStatuses.map { case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN"))) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index 538482f6ff..27f0dce9c9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -21,21 +21,21 @@ import org.scalatest.FunSuite class BlockIdSuite extends FunSuite { def assertSame(id1: BlockId, id2: BlockId) { - assert(id1.filename === id2.filename) - assert(id1.toString === id2.toString) + assert(id1.name === id2.name) + assert(id1.asFilename === id2.asFilename) assert(id1.hashCode === id2.hashCode) assert(id1 === id2) } def assertDifferent(id1: BlockId, id2: BlockId) { - assert(id1.filename != id2.filename) - assert(id1.toString != id2.toString) + assert(id1.name != id2.name) + assert(id1.asFilename != id2.asFilename) assert(id1.hashCode != id2.hashCode) assert(id1 != id2) } test("basic-functions") { - case class MyBlockId(filename: String) extends BlockId + case class MyBlockId(name: String) extends BlockId val id = MyBlockId("a") assertSame(id, MyBlockId("a")) @@ -56,7 +56,7 @@ class BlockIdSuite extends FunSuite { val id = RDDBlockId(1, 2) assertSame(id, RDDBlockId(1, 2)) assertDifferent(id, RDDBlockId(1, 1)) - assert(id.toString === "rdd_1_2") + assert(id.name === "rdd_1_2") assert(id.asRDDId.get.rddId === 1) assert(id.asRDDId.get.splitIndex === 2) assert(id.isRDD) @@ -67,7 +67,7 @@ class BlockIdSuite extends FunSuite { val id = ShuffleBlockId(1, 2, 3) assertSame(id, ShuffleBlockId(1, 2, 3)) assertDifferent(id, ShuffleBlockId(3, 2, 3)) - assert(id.toString === "shuffle_1_2_3") + assert(id.name === "shuffle_1_2_3") assert(id.asRDDId === None) assert(id.shuffleId === 1) assert(id.mapId === 2) @@ -80,7 +80,7 @@ class BlockIdSuite extends FunSuite { val id = BroadcastBlockId(42) assertSame(id, BroadcastBlockId(42)) assertDifferent(id, BroadcastBlockId(123)) - assert(id.toString === "broadcast_42") + assert(id.name === "broadcast_42") assert(id.asRDDId === None) assert(id.broadcastId === 42) assert(id.isBroadcast) @@ -91,7 +91,7 @@ class BlockIdSuite extends FunSuite { val id = TaskResultBlockId(60) assertSame(id, TaskResultBlockId(60)) assertDifferent(id, TaskResultBlockId(61)) - assert(id.toString === "taskresult_60") + assert(id.name === "taskresult_60") assert(id.asRDDId === None) assert(id.taskId === 60) assert(!id.isRDD) @@ -102,7 +102,7 @@ class BlockIdSuite extends FunSuite { val id = StreamBlockId(1, 100) assertSame(id, StreamBlockId(1, 100)) assertDifferent(id, StreamBlockId(2, 101)) - assert(id.toString === "input-1-100") + assert(id.name === "input-1-100") assert(id.asRDDId === None) assert(id.streamId === 1) assert(id.uniqueId === 100) @@ -114,7 +114,7 @@ class BlockIdSuite extends FunSuite { val id = TestBlockId("abc") assertSame(id, TestBlockId("abc")) assertDifferent(id, TestBlockId("ab")) - assert(id.toString === "test_abc") + assert(id.name === "test_abc") assert(id.asRDDId === None) assert(id.id === "abc") assert(!id.isShuffle) |