diff options
-rw-r--r-- | src/scala/spark/Broadcast.scala | 482 | ||||
-rw-r--r-- | third_party/FreePastry-2.1.jar | bin | 2895564 -> 0 bytes |
2 files changed, 2 insertions, 480 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 88a0dde3e7..3e49f7ea5d 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -15,19 +15,8 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} import spark.compress.lzf.{LZFInputStream, LZFOutputStream} -//import rice.environment.Environment -//import rice.p2p.commonapi._ -//import rice.p2p.commonapi.rawserialization.RawMessage -//import rice.pastry._ -//import rice.pastry.commonapi.PastryIdFactory -//import rice.pastry.direct._ -//import rice.pastry.socket.SocketPastryNodeFactory -//import rice.pastry.standard.RandomNodeIdFactory -//import rice.p2p.scribe._ -//import rice.p2p.splitstream._ - @serializable -trait BroadcastRecipe { +trait Broadcast { val uuid = UUID.randomUUID // We cannot have an abstract readObject here due to some weird issues with @@ -39,7 +28,7 @@ trait BroadcastRecipe { @serializable class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) -extends BroadcastRecipe with Logging { +extends Broadcast with Logging { def value = value_ @@ -138,9 +127,6 @@ extends BroadcastRecipe with Logging { if (cachedVal != null) { value_ = cachedVal.asInstanceOf[T] } else { - // Only a single worker (the first one) in the same node can ever be - // here. The rest will always get the value ready - // Initializing everything because Master will only send null/0 values initializeSlaveVariables @@ -762,73 +748,6 @@ extends BroadcastRecipe with Logging { } } -//@serializable -//class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) -//extends BroadcastRecipe with Logging { - -// def value = value_ - -// BroadcastSS.synchronized { BroadcastSS.values.put (uuid, value_) } -// -// if (!local) { -// sendBroadcast -// } -// -// @transient var publishThread: PublishThread = null -// @transient var hasCopyInHDFS = false -// -// def sendBroadcast () { -// // Store a persistent copy in HDFS -// val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) -// out.writeObject (value_) -// out.close -// hasCopyInHDFS = true -// -// publishThread = new PublishThread -// publishThread.start -// } -// -// private def readObject (in: ObjectInputStream) { -// in.defaultReadObject -// BroadcastSS.synchronized { -// val cachedVal = BroadcastSS.values.get(uuid) -// if (cachedVal != null) { -// value_ = cachedVal.asInstanceOf[T] -// } else { -// val start = System.nanoTime - -// // Thread.sleep (5000) // TODO: -// val receptionSucceeded = BroadcastSS.receiveVariable (uuid) -// // If does not succeed, then get from HDFS copy -// if (receptionSucceeded) { -// value_ = BroadcastSS.values.get(uuid).asInstanceOf[T] -// } else { -// logInfo ("Reading from HDFS") -// val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) -// value_ = fileIn.readObject.asInstanceOf[T] -// BroadcastSS.values.put(uuid, value_) -// fileIn.close -// } -// -// val time = (System.nanoTime - start) / 1e9 -// logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") -// } -// } -// } -// -// class PublishThread -// extends Thread with Logging { -// override def run = { -// // TODO: Put some delay here to give time others to register -// // Thread.sleep (5000) -// logInfo ("Waited. Now sending...") -// BroadcastSS.synchronized { -// BroadcastSS.publishVariable[T] (uuid, value) -// } -// } -// } -//} - @serializable class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) extends BroadcastRecipe with Logging { @@ -885,23 +804,6 @@ extends Comparable [SourceInfo] with Logging { // Ascending sort based on leecher count def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers) - - // Descending sort based on speed - // def compareTo (o: SourceInfo): Int = { - // if (MBps > o.MBps) -1 - // else if (MBps < o.MBps) 1 - // else 0 - // } - - // Descending sort based on globally stored speed - // def compareTo (o: SourceInfo): Int = { - // val mySpeed = BroadcastCS.getSourceSpeed (hostAddress) - // val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress) - - // if (mySpeed > urSpeed) -1 - // else if (mySpeed < urSpeed) 1 - // else 0 - // } } object SourceInfo { @@ -936,9 +838,6 @@ extends Logging { BroadcastCH.initialize // Initialization for ChainedStreamingBroadcast BroadcastCS.initialize (isMaster) - // Initialization for SplitStreamBroadcast - // TODO: SplitStream turned OFF - // BroadcastSS.initialize (isMaster) initialized = true } @@ -1158,383 +1057,6 @@ extends Logging { } } -//private object BroadcastSS { -// val values = new MapMaker ().softValues ().makeMap[UUID, Any] - -// private val valueBytes = new MapMaker().softValues().makeMap[UUID,Array[Byte]] - -// private var initialized = false -// private var isMaster_ = false -// -// private var masterBootHost_ = "127.0.0.1" -// private var masterBootPort_ : Int = 22222 -// private var blockSize_ : Int = 512 * 1024 -// private var MaxRetryCount_ : Int = 2 -// -// private var masterBootAddress_ : InetSocketAddress = null -// private var localBindPort_ : Int = -1 -// -// private var pEnvironment_ : Environment = null -// private var pastryNode_ : PastryNode = null -// private var ssClient: SSClient = null -// -// // Current transmission state variables -// private var curUUID: UUID = null -// private var curTotalBlocks = -1 -// private var curTotalBytes = -1 -// private var curHasBlocks = -1 -// private var curBlockBitmap: Array[Boolean] = null -// private var curArrayOfBytes: Array[Byte] = null -// -// // TODO: Add stuff so that we can handle out of order variable broadcast - -// def initialize (isMaster__ : Boolean) { -// synchronized { -// if (!initialized) { -// masterBootHost_ = -// System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1") -// masterBootPort_ = -// System.getProperty ("spark.broadcast.masterBootPort", "22222").toInt -// -// masterBootAddress_ = new InetSocketAddress(masterBootHost_, -// masterBootPort_) -// -// blockSize_ = -// System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024 -// MaxRetryCount_ = -// System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt -// -// isMaster_ = isMaster__ -// -// // Initialize the SplitStream tree -// initializeSplitStream -// -// initialized = true -// } -// } -// } -// -// def masterBootAddress = masterBootAddress_ -// def blockSize = blockSize_ -// def MaxRetryCount = MaxRetryCount_ -// -// def pEnvironment: Environment = { -// if (pEnvironment_ == null) { -// initializeSplitStream -// } -// pEnvironment_ -// } -// -// def pastryNode: PastryNode = { -// if (pastryNode_ == null) { -// initializeSplitStream -// } -// pastryNode_ -// } -// -// def localBindPort = { -// if (localBindPort_ == -1) { -// if (isMaster) { -// localBindPort_ = masterBootPort_ -// } -// else { -// // TODO: What's the best way of finding a free port? -// val sSocket = new ServerSocket (0) -// val sPort = sSocket.getLocalPort -// sSocket.close -// localBindPort_ = sPort -// } -// } -// localBindPort_ -// } - -// def isMaster = isMaster_ -// -// private def initializeSplitStream = { -// pEnvironment_ = new Environment -// -// // Generate the NodeIds Randomly -// val nidFactory = new RandomNodeIdFactory (pEnvironment) -// -// // Construct the PastryNodeFactory -// val pastryNodeFactory = new SocketPastryNodeFactory (nidFactory, -// localBindPort, pEnvironment) -// -// // Construct a Pastry node -// pastryNode_ = pastryNodeFactory.newNode -// -// // Boot the node. -// pastryNode.boot (masterBootAddress) -// // TODO: Some unknown messages are dropped in slaves at this point -// -// // The node may require sending several messages to fully boot into the ring -// pastryNode.synchronized { -// while(!pastryNode.isReady && !pastryNode.joinFailed) { -// // Delay so we don't busy-wait -// pastryNode.wait (500) -// -// // Abort if can't join -// if (pastryNode.joinFailed()) { -// // TODO: throw new IOException("Join failed " + node.joinFailedReason) -// } -// } -// } -// -// // Create the SplitStream client and subscribe -// ssClient = new SSClient (BroadcastSS.pastryNode) -// ssClient.subscribe -// } -// -// def publishVariable[A] (uuid: UUID, obj: A) = { -// ssClient.synchronized { -// ssClient.publish[A] (uuid, obj) -// } -// } -// -// // Return status of the reception -// def receiveVariable[A] (uuid: UUID): Boolean = { -// // TODO: Things will change if out-of-order variable recepetion is supported -// -// logInfo ("In receiveVariable") -// -// // Check in valueBytes -// if (xferValueBytesToValues[A] (uuid)) { -// return true -// } -// -// // Check if its in progress -// for (i <- 0 until MaxRetryCount) { -// logInfo (uuid + " " + curUUID) -// while (uuid == curUUID) { -// Thread.sleep (100) // TODO: How long to sleep -// } -// if (xferValueBytesToValues[A] (uuid)) { -// return true -// } -// -// // Wait for a while to see if we've reached here before xmission started -// Thread.sleep (100) -// } -// return false -// } -// -// private def xferValueBytesToValues[A] (uuid: UUID): Boolean = { -// var cachedValueBytes: Array[Byte] = null -// valueBytes.synchronized { -// cachedValueBytes = valueBytes.get (uuid) -// } -// if (cachedValueBytes != null) { -// val cachedValue = byteArrayToObject[A] (cachedValueBytes) -// values.synchronized { -// values.put (uuid, cachedValue) -// } -// return true -// } -// return false -// } -// -// private def objectToByteArray[A] (obj: A): Array[Byte] = { -// val baos = new ByteArrayOutputStream -// val oos = new ObjectOutputStream (baos) -// oos.writeObject (obj) -// oos.close -// baos.close -// return baos.toByteArray -// } - -// private def byteArrayToObject[A] (bytes: Array[Byte]): A = { -// val in = new ObjectInputStream (new ByteArrayInputStream (bytes)) -// val retVal = in.readObject.asInstanceOf[A] -// in.close -// return retVal -// } - -// private def intToByteArray (value: Int): Array[Byte] = { -// var retVal = new Array[Byte] (4) -// for (i <- 0 until 4) { -// retVal(i) = (value >> ((4 - 1 - i) * 8)).toByte -// } -// return retVal -// } - -// private def byteArrayToInt (arr: Array[Byte], offset: Int): Int = { -// var retVal = 0 -// for (i <- 0 until 4) { -// retVal += ((arr(i + offset).toInt & 0x000000FF) << ((4 - 1 - i) * 8)) -// } -// return retVal -// } - -// class SSClient (pastryNode: PastryNode) -// extends SplitStreamClient -// with Application { -// // Magic bits: 11111100001100100100110000111111 -// val magicBits = 0xFC324C3F -// -// // Message Types -// val INFO_MSG = 1 -// val DATA_MSG = 2 -// -// // The Endpoint represents the underlying node. By making calls on the -// // Endpoint, it assures that the message will be delivered to the App on -// // whichever node the message is intended for. -// protected val endPoint = pastryNode.buildEndpoint (this, "myInstance") - -// // Handle to a SplitStream implementation -// val mySplitStream = new SplitStreamImpl (pastryNode, "mySplitStream") - -// // The ChannelId is constructed from a normal PastryId based on the UUID -// val myChannelId = new ChannelId (new PastryIdFactory -// (pastryNode.getEnvironment).buildId ("myChannel")) -// -// // The channel -// var myChannel: Channel = null -// -// // The stripes. Acquired from myChannel. -// var myStripes: Array[Stripe] = null - -// // Now we can receive messages -// endPoint.register -// -// // Subscribes to all stripes in myChannelId. -// def subscribe = { -// // Attaching makes you part of the Channel, and volunteers to be an -// // internal node of one of the trees -// myChannel = mySplitStream.attachChannel (myChannelId) -// -// // Subscribing notifies your application when data comes through the tree -// myStripes = myChannel.getStripes -// for (curStripe <- myStripes) { -// curStripe.subscribe (this) -// } -// } -// -// // Part of SplitStreamClient. Called when a published message is received. -// def deliver (s: Stripe, data: Array[Byte]) = { -// // Unpack and verify magicBits -// val topLevelInfo = byteArrayToObject[(Int, Int, Array[Byte])] (data) -// -// // Process only if magicBits are OK -// if (topLevelInfo._1 == magicBits) { -// // Process only for slaves -// if (!BroadcastSS.isMaster) { -// // Match on Message Type -// topLevelInfo._2 match { -// case INFO_MSG => { -// val realInfo = byteArrayToObject[(UUID, Int, Int)] ( -// topLevelInfo._3) -// -// // Setup states for impending transmission -// curUUID = realInfo._1 // TODO: -// curTotalBlocks = realInfo._2 -// curTotalBytes = realInfo._3 -// -// curHasBlocks = 0 -// curBlockBitmap = new Array[Boolean] (curTotalBlocks) -// curArrayOfBytes = new Array[Byte] (curTotalBytes) -// -// logInfo (curUUID + " " + curTotalBlocks + " " + curTotalBytes) -// } -// case DATA_MSG => { -// val realInfo = byteArrayToObject[(UUID, Int, Array[Byte])] ( -// topLevelInfo._3) -// val blockUUID = realInfo._1 -// val blockIndex = realInfo._2 -// val blockData = realInfo._3 -// -// // TODO: Will change in future implementation. Right now we -// // require broadcast in order on the variable level. Blocks can -// // come out of order though -// assert (blockUUID == curUUID) -// -// // Update everything -// curHasBlocks += 1 -// curBlockBitmap(blockIndex) = true -// System.arraycopy (blockData, 0, curArrayOfBytes, -// blockIndex * blockSize, blockData.length) -// -// logInfo ("Got stuff for: " + blockUUID) -// -// // Done receiving -// if (curHasBlocks == curTotalBlocks) { -// // Store as a Array[Byte] -// valueBytes.synchronized { -// valueBytes.put (curUUID, curArrayOfBytes) -// } -// -// logInfo ("Finished reading. Stored in valueBytes") -// -// // RESET -// curUUID = null -// } -// } -// case _ => { -// // Should never happen -// } -// } -// } -// } -// } - -// // Multicasts data. -// def publish[A] (uuid: UUID, obj: A) = { -// val byteArray = objectToByteArray[A] (obj) -// -// var blockNum = (byteArray.length / blockSize) -// if (byteArray.length % blockSize != 0) -// blockNum += 1 -// -// // ------------------------------------- -// // INFO_MSG: | UUID | Total Blocks | Total Bytes | -// // ------------------------------------- -// var infoByteArray = objectToByteArray[(UUID, Int, Int)] ((uuid, blockNum, -// byteArray.length)) -// doPublish (0, INFO_MSG, infoByteArray) -// -// // ------------------------------------- -// // DATA_MSG: | UUID | Block Index | Single Block | -// // ------------------------------------- -// var blockID = 0 -// for (i <- 0 until (byteArray.length, blockSize)) { -// val thisBlockSize = Math.min (blockSize, byteArray.length - i) -// var thisBlockData = new Array[Byte] (thisBlockSize) -// System.arraycopy (byteArray, i * blockSize, thisBlockData, 0, -// thisBlockSize) - -// var dataByteArray = objectToByteArray[(UUID, Int, Array[Byte])] ((uuid, -// blockID, thisBlockData)) -// doPublish (blockID % myStripes.length, DATA_MSG, dataByteArray) - -// blockID += 1 -// } -// } -// -// // -------------------------------- -// // Message Format: | MagicBits | Type | Real Data | -// // -------------------------------- -// private def doPublish (stripeID: Int, msgType: Int, data: Array[Byte]) = { -// val bytesToSend = objectToByteArray[(Int, Int, Array[Byte])] ((magicBits, -// msgType, data)) -// myStripes(stripeID).publish (bytesToSend) -// } - -// /* class PublishContent -// extends Message { -// def getPriority: Int = { Message.MEDIUM_PRIORITY } -// } */ -// -// // Error handling -// def joinFailed(s: Stripe) = { -// logInfo ("joinFailed(" + s + ")") -// } -// -// // Rest of the Application interface. NOT USED. -// def deliver (id: rice.p2p.commonapi.Id, message: Message) = { } -// def forward (message: RouteMessage): Boolean = false -// def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { } -// } -//} - private object BroadcastCH extends Logging { val values = new MapMaker ().softValues ().makeMap[UUID, Any] diff --git a/third_party/FreePastry-2.1.jar b/third_party/FreePastry-2.1.jar Binary files differdeleted file mode 100644 index 51146e1541..0000000000 --- a/third_party/FreePastry-2.1.jar +++ /dev/null |