aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-26 22:06:43 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-26 22:06:43 -0800
commit9c8a65b920d434e51bfb6dc7eccc71e65b3a3c3d (patch)
tree44c6911d074f821a50fe8c0f2974afd569e3683b
parent690b55917d1d32439b7a58cbdd6ca567f06414b1 (diff)
downloadspark-9c8a65b920d434e51bfb6dc7eccc71e65b3a3c3d.tar.gz
spark-9c8a65b920d434e51bfb6dc7eccc71e65b3a3c3d.tar.bz2
spark-9c8a65b920d434e51bfb6dc7eccc71e65b3a3c3d.zip
Removed everything related to SplitStreaming...
-rw-r--r--src/scala/spark/Broadcast.scala482
-rw-r--r--third_party/FreePastry-2.1.jarbin2895564 -> 0 bytes
2 files changed, 2 insertions, 480 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 88a0dde3e7..3e49f7ea5d 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -15,19 +15,8 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
-//import rice.environment.Environment
-//import rice.p2p.commonapi._
-//import rice.p2p.commonapi.rawserialization.RawMessage
-//import rice.pastry._
-//import rice.pastry.commonapi.PastryIdFactory
-//import rice.pastry.direct._
-//import rice.pastry.socket.SocketPastryNodeFactory
-//import rice.pastry.standard.RandomNodeIdFactory
-//import rice.p2p.scribe._
-//import rice.p2p.splitstream._
-
@serializable
-trait BroadcastRecipe {
+trait Broadcast {
val uuid = UUID.randomUUID
// We cannot have an abstract readObject here due to some weird issues with
@@ -39,7 +28,7 @@ trait BroadcastRecipe {
@serializable
class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
-extends BroadcastRecipe with Logging {
+extends Broadcast with Logging {
def value = value_
@@ -138,9 +127,6 @@ extends BroadcastRecipe with Logging {
if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T]
} else {
- // Only a single worker (the first one) in the same node can ever be
- // here. The rest will always get the value ready
-
// Initializing everything because Master will only send null/0 values
initializeSlaveVariables
@@ -762,73 +748,6 @@ extends BroadcastRecipe with Logging {
}
}
-//@serializable
-//class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
-//extends BroadcastRecipe with Logging {
-
-// def value = value_
-
-// BroadcastSS.synchronized { BroadcastSS.values.put (uuid, value_) }
-//
-// if (!local) {
-// sendBroadcast
-// }
-//
-// @transient var publishThread: PublishThread = null
-// @transient var hasCopyInHDFS = false
-//
-// def sendBroadcast () {
-// // Store a persistent copy in HDFS
-// val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
-// out.writeObject (value_)
-// out.close
-// hasCopyInHDFS = true
-//
-// publishThread = new PublishThread
-// publishThread.start
-// }
-//
-// private def readObject (in: ObjectInputStream) {
-// in.defaultReadObject
-// BroadcastSS.synchronized {
-// val cachedVal = BroadcastSS.values.get(uuid)
-// if (cachedVal != null) {
-// value_ = cachedVal.asInstanceOf[T]
-// } else {
-// val start = System.nanoTime
-
-// // Thread.sleep (5000) // TODO:
-// val receptionSucceeded = BroadcastSS.receiveVariable (uuid)
-// // If does not succeed, then get from HDFS copy
-// if (receptionSucceeded) {
-// value_ = BroadcastSS.values.get(uuid).asInstanceOf[T]
-// } else {
-// logInfo ("Reading from HDFS")
-// val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
-// value_ = fileIn.readObject.asInstanceOf[T]
-// BroadcastSS.values.put(uuid, value_)
-// fileIn.close
-// }
-//
-// val time = (System.nanoTime - start) / 1e9
-// logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
-// }
-// }
-// }
-//
-// class PublishThread
-// extends Thread with Logging {
-// override def run = {
-// // TODO: Put some delay here to give time others to register
-// // Thread.sleep (5000)
-// logInfo ("Waited. Now sending...")
-// BroadcastSS.synchronized {
-// BroadcastSS.publishVariable[T] (uuid, value)
-// }
-// }
-// }
-//}
-
@serializable
class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
extends BroadcastRecipe with Logging {
@@ -885,23 +804,6 @@ extends Comparable [SourceInfo] with Logging {
// Ascending sort based on leecher count
def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers)
-
- // Descending sort based on speed
- // def compareTo (o: SourceInfo): Int = {
- // if (MBps > o.MBps) -1
- // else if (MBps < o.MBps) 1
- // else 0
- // }
-
- // Descending sort based on globally stored speed
- // def compareTo (o: SourceInfo): Int = {
- // val mySpeed = BroadcastCS.getSourceSpeed (hostAddress)
- // val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress)
-
- // if (mySpeed > urSpeed) -1
- // else if (mySpeed < urSpeed) 1
- // else 0
- // }
}
object SourceInfo {
@@ -936,9 +838,6 @@ extends Logging {
BroadcastCH.initialize
// Initialization for ChainedStreamingBroadcast
BroadcastCS.initialize (isMaster)
- // Initialization for SplitStreamBroadcast
- // TODO: SplitStream turned OFF
- // BroadcastSS.initialize (isMaster)
initialized = true
}
@@ -1158,383 +1057,6 @@ extends Logging {
}
}
-//private object BroadcastSS {
-// val values = new MapMaker ().softValues ().makeMap[UUID, Any]
-
-// private val valueBytes = new MapMaker().softValues().makeMap[UUID,Array[Byte]]
-
-// private var initialized = false
-// private var isMaster_ = false
-//
-// private var masterBootHost_ = "127.0.0.1"
-// private var masterBootPort_ : Int = 22222
-// private var blockSize_ : Int = 512 * 1024
-// private var MaxRetryCount_ : Int = 2
-//
-// private var masterBootAddress_ : InetSocketAddress = null
-// private var localBindPort_ : Int = -1
-//
-// private var pEnvironment_ : Environment = null
-// private var pastryNode_ : PastryNode = null
-// private var ssClient: SSClient = null
-//
-// // Current transmission state variables
-// private var curUUID: UUID = null
-// private var curTotalBlocks = -1
-// private var curTotalBytes = -1
-// private var curHasBlocks = -1
-// private var curBlockBitmap: Array[Boolean] = null
-// private var curArrayOfBytes: Array[Byte] = null
-//
-// // TODO: Add stuff so that we can handle out of order variable broadcast
-
-// def initialize (isMaster__ : Boolean) {
-// synchronized {
-// if (!initialized) {
-// masterBootHost_ =
-// System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1")
-// masterBootPort_ =
-// System.getProperty ("spark.broadcast.masterBootPort", "22222").toInt
-//
-// masterBootAddress_ = new InetSocketAddress(masterBootHost_,
-// masterBootPort_)
-//
-// blockSize_ =
-// System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
-// MaxRetryCount_ =
-// System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt
-//
-// isMaster_ = isMaster__
-//
-// // Initialize the SplitStream tree
-// initializeSplitStream
-//
-// initialized = true
-// }
-// }
-// }
-//
-// def masterBootAddress = masterBootAddress_
-// def blockSize = blockSize_
-// def MaxRetryCount = MaxRetryCount_
-//
-// def pEnvironment: Environment = {
-// if (pEnvironment_ == null) {
-// initializeSplitStream
-// }
-// pEnvironment_
-// }
-//
-// def pastryNode: PastryNode = {
-// if (pastryNode_ == null) {
-// initializeSplitStream
-// }
-// pastryNode_
-// }
-//
-// def localBindPort = {
-// if (localBindPort_ == -1) {
-// if (isMaster) {
-// localBindPort_ = masterBootPort_
-// }
-// else {
-// // TODO: What's the best way of finding a free port?
-// val sSocket = new ServerSocket (0)
-// val sPort = sSocket.getLocalPort
-// sSocket.close
-// localBindPort_ = sPort
-// }
-// }
-// localBindPort_
-// }
-
-// def isMaster = isMaster_
-//
-// private def initializeSplitStream = {
-// pEnvironment_ = new Environment
-//
-// // Generate the NodeIds Randomly
-// val nidFactory = new RandomNodeIdFactory (pEnvironment)
-//
-// // Construct the PastryNodeFactory
-// val pastryNodeFactory = new SocketPastryNodeFactory (nidFactory,
-// localBindPort, pEnvironment)
-//
-// // Construct a Pastry node
-// pastryNode_ = pastryNodeFactory.newNode
-//
-// // Boot the node.
-// pastryNode.boot (masterBootAddress)
-// // TODO: Some unknown messages are dropped in slaves at this point
-//
-// // The node may require sending several messages to fully boot into the ring
-// pastryNode.synchronized {
-// while(!pastryNode.isReady && !pastryNode.joinFailed) {
-// // Delay so we don't busy-wait
-// pastryNode.wait (500)
-//
-// // Abort if can't join
-// if (pastryNode.joinFailed()) {
-// // TODO: throw new IOException("Join failed " + node.joinFailedReason)
-// }
-// }
-// }
-//
-// // Create the SplitStream client and subscribe
-// ssClient = new SSClient (BroadcastSS.pastryNode)
-// ssClient.subscribe
-// }
-//
-// def publishVariable[A] (uuid: UUID, obj: A) = {
-// ssClient.synchronized {
-// ssClient.publish[A] (uuid, obj)
-// }
-// }
-//
-// // Return status of the reception
-// def receiveVariable[A] (uuid: UUID): Boolean = {
-// // TODO: Things will change if out-of-order variable recepetion is supported
-//
-// logInfo ("In receiveVariable")
-//
-// // Check in valueBytes
-// if (xferValueBytesToValues[A] (uuid)) {
-// return true
-// }
-//
-// // Check if its in progress
-// for (i <- 0 until MaxRetryCount) {
-// logInfo (uuid + " " + curUUID)
-// while (uuid == curUUID) {
-// Thread.sleep (100) // TODO: How long to sleep
-// }
-// if (xferValueBytesToValues[A] (uuid)) {
-// return true
-// }
-//
-// // Wait for a while to see if we've reached here before xmission started
-// Thread.sleep (100)
-// }
-// return false
-// }
-//
-// private def xferValueBytesToValues[A] (uuid: UUID): Boolean = {
-// var cachedValueBytes: Array[Byte] = null
-// valueBytes.synchronized {
-// cachedValueBytes = valueBytes.get (uuid)
-// }
-// if (cachedValueBytes != null) {
-// val cachedValue = byteArrayToObject[A] (cachedValueBytes)
-// values.synchronized {
-// values.put (uuid, cachedValue)
-// }
-// return true
-// }
-// return false
-// }
-//
-// private def objectToByteArray[A] (obj: A): Array[Byte] = {
-// val baos = new ByteArrayOutputStream
-// val oos = new ObjectOutputStream (baos)
-// oos.writeObject (obj)
-// oos.close
-// baos.close
-// return baos.toByteArray
-// }
-
-// private def byteArrayToObject[A] (bytes: Array[Byte]): A = {
-// val in = new ObjectInputStream (new ByteArrayInputStream (bytes))
-// val retVal = in.readObject.asInstanceOf[A]
-// in.close
-// return retVal
-// }
-
-// private def intToByteArray (value: Int): Array[Byte] = {
-// var retVal = new Array[Byte] (4)
-// for (i <- 0 until 4) {
-// retVal(i) = (value >> ((4 - 1 - i) * 8)).toByte
-// }
-// return retVal
-// }
-
-// private def byteArrayToInt (arr: Array[Byte], offset: Int): Int = {
-// var retVal = 0
-// for (i <- 0 until 4) {
-// retVal += ((arr(i + offset).toInt & 0x000000FF) << ((4 - 1 - i) * 8))
-// }
-// return retVal
-// }
-
-// class SSClient (pastryNode: PastryNode)
-// extends SplitStreamClient
-// with Application {
-// // Magic bits: 11111100001100100100110000111111
-// val magicBits = 0xFC324C3F
-//
-// // Message Types
-// val INFO_MSG = 1
-// val DATA_MSG = 2
-//
-// // The Endpoint represents the underlying node. By making calls on the
-// // Endpoint, it assures that the message will be delivered to the App on
-// // whichever node the message is intended for.
-// protected val endPoint = pastryNode.buildEndpoint (this, "myInstance")
-
-// // Handle to a SplitStream implementation
-// val mySplitStream = new SplitStreamImpl (pastryNode, "mySplitStream")
-
-// // The ChannelId is constructed from a normal PastryId based on the UUID
-// val myChannelId = new ChannelId (new PastryIdFactory
-// (pastryNode.getEnvironment).buildId ("myChannel"))
-//
-// // The channel
-// var myChannel: Channel = null
-//
-// // The stripes. Acquired from myChannel.
-// var myStripes: Array[Stripe] = null
-
-// // Now we can receive messages
-// endPoint.register
-//
-// // Subscribes to all stripes in myChannelId.
-// def subscribe = {
-// // Attaching makes you part of the Channel, and volunteers to be an
-// // internal node of one of the trees
-// myChannel = mySplitStream.attachChannel (myChannelId)
-//
-// // Subscribing notifies your application when data comes through the tree
-// myStripes = myChannel.getStripes
-// for (curStripe <- myStripes) {
-// curStripe.subscribe (this)
-// }
-// }
-//
-// // Part of SplitStreamClient. Called when a published message is received.
-// def deliver (s: Stripe, data: Array[Byte]) = {
-// // Unpack and verify magicBits
-// val topLevelInfo = byteArrayToObject[(Int, Int, Array[Byte])] (data)
-//
-// // Process only if magicBits are OK
-// if (topLevelInfo._1 == magicBits) {
-// // Process only for slaves
-// if (!BroadcastSS.isMaster) {
-// // Match on Message Type
-// topLevelInfo._2 match {
-// case INFO_MSG => {
-// val realInfo = byteArrayToObject[(UUID, Int, Int)] (
-// topLevelInfo._3)
-//
-// // Setup states for impending transmission
-// curUUID = realInfo._1 // TODO:
-// curTotalBlocks = realInfo._2
-// curTotalBytes = realInfo._3
-//
-// curHasBlocks = 0
-// curBlockBitmap = new Array[Boolean] (curTotalBlocks)
-// curArrayOfBytes = new Array[Byte] (curTotalBytes)
-//
-// logInfo (curUUID + " " + curTotalBlocks + " " + curTotalBytes)
-// }
-// case DATA_MSG => {
-// val realInfo = byteArrayToObject[(UUID, Int, Array[Byte])] (
-// topLevelInfo._3)
-// val blockUUID = realInfo._1
-// val blockIndex = realInfo._2
-// val blockData = realInfo._3
-//
-// // TODO: Will change in future implementation. Right now we
-// // require broadcast in order on the variable level. Blocks can
-// // come out of order though
-// assert (blockUUID == curUUID)
-//
-// // Update everything
-// curHasBlocks += 1
-// curBlockBitmap(blockIndex) = true
-// System.arraycopy (blockData, 0, curArrayOfBytes,
-// blockIndex * blockSize, blockData.length)
-//
-// logInfo ("Got stuff for: " + blockUUID)
-//
-// // Done receiving
-// if (curHasBlocks == curTotalBlocks) {
-// // Store as a Array[Byte]
-// valueBytes.synchronized {
-// valueBytes.put (curUUID, curArrayOfBytes)
-// }
-//
-// logInfo ("Finished reading. Stored in valueBytes")
-//
-// // RESET
-// curUUID = null
-// }
-// }
-// case _ => {
-// // Should never happen
-// }
-// }
-// }
-// }
-// }
-
-// // Multicasts data.
-// def publish[A] (uuid: UUID, obj: A) = {
-// val byteArray = objectToByteArray[A] (obj)
-//
-// var blockNum = (byteArray.length / blockSize)
-// if (byteArray.length % blockSize != 0)
-// blockNum += 1
-//
-// // -------------------------------------
-// // INFO_MSG: | UUID | Total Blocks | Total Bytes |
-// // -------------------------------------
-// var infoByteArray = objectToByteArray[(UUID, Int, Int)] ((uuid, blockNum,
-// byteArray.length))
-// doPublish (0, INFO_MSG, infoByteArray)
-//
-// // -------------------------------------
-// // DATA_MSG: | UUID | Block Index | Single Block |
-// // -------------------------------------
-// var blockID = 0
-// for (i <- 0 until (byteArray.length, blockSize)) {
-// val thisBlockSize = Math.min (blockSize, byteArray.length - i)
-// var thisBlockData = new Array[Byte] (thisBlockSize)
-// System.arraycopy (byteArray, i * blockSize, thisBlockData, 0,
-// thisBlockSize)
-
-// var dataByteArray = objectToByteArray[(UUID, Int, Array[Byte])] ((uuid,
-// blockID, thisBlockData))
-// doPublish (blockID % myStripes.length, DATA_MSG, dataByteArray)
-
-// blockID += 1
-// }
-// }
-//
-// // --------------------------------
-// // Message Format: | MagicBits | Type | Real Data |
-// // --------------------------------
-// private def doPublish (stripeID: Int, msgType: Int, data: Array[Byte]) = {
-// val bytesToSend = objectToByteArray[(Int, Int, Array[Byte])] ((magicBits,
-// msgType, data))
-// myStripes(stripeID).publish (bytesToSend)
-// }
-
-// /* class PublishContent
-// extends Message {
-// def getPriority: Int = { Message.MEDIUM_PRIORITY }
-// } */
-//
-// // Error handling
-// def joinFailed(s: Stripe) = {
-// logInfo ("joinFailed(" + s + ")")
-// }
-//
-// // Rest of the Application interface. NOT USED.
-// def deliver (id: rice.p2p.commonapi.Id, message: Message) = { }
-// def forward (message: RouteMessage): Boolean = false
-// def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { }
-// }
-//}
-
private object BroadcastCH
extends Logging {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
diff --git a/third_party/FreePastry-2.1.jar b/third_party/FreePastry-2.1.jar
deleted file mode 100644
index 51146e1541..0000000000
--- a/third_party/FreePastry-2.1.jar
+++ /dev/null
Binary files differ