aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-06 19:37:27 -0700
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-06 19:37:27 -0700
commitcbce44304c7c8219a6fee69116bc5bdde83e1018 (patch)
tree61879589931b2443c6fdf0ee7c772a9a5c8d8127
parentc37c74919f31ec69e359786bc70d40b7d7319e22 (diff)
downloadspark-cbce44304c7c8219a6fee69116bc5bdde83e1018.tar.gz
spark-cbce44304c7c8219a6fee69116bc5bdde83e1018.tar.bz2
spark-cbce44304c7c8219a6fee69116bc5bdde83e1018.zip
Added exception handling to 'sendStopBroadcastNotifications'
-rw-r--r--src/scala/spark/Broadcast.scala65
1 files changed, 40 insertions, 25 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 0a49514933..4a13d576b1 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -312,7 +312,7 @@ extends BroadcastRecipe with Logging {
// Keep exchaning information until all blocks have been received
while (hasBlocks < totalBlocks) {
talkOnce
- Thread.sleep ( BroadcastBT.ranGen.nextInt (
+ Thread.sleep (BroadcastBT.ranGen.nextInt (
BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) +
BroadcastBT.MinKnockInterval)
}
@@ -387,7 +387,7 @@ extends BroadcastRecipe with Logging {
}
}
- Thread.sleep ( BroadcastBT.ranGen.nextInt (
+ Thread.sleep (BroadcastBT.ranGen.nextInt (
BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) +
BroadcastBT.MinKnockInterval)
@@ -403,8 +403,8 @@ extends BroadcastRecipe with Logging {
if (gInfo.listenPort == SourceInfo.TxOverGoToHDFS ||
gInfo.listenPort == SourceInfo.TxNotStartedRetry) {
- // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go
- // to HDFS anyway when receiveBroadcast returns false
+ // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go
+ // to HDFS anyway when receiveBroadcast returns false
return false
}
@@ -699,26 +699,41 @@ extends BroadcastRecipe with Logging {
private def sendStopBroadcastNotifications = {
listOfSources.synchronized {
listOfSources.foreach { sourceInfo =>
- // Connect to the source
- var guideSocketToSource =
- new Socket (sourceInfo.hostAddress, sourceInfo.listenPort)
- var gosSource =
- new ObjectOutputStream (guideSocketToSource.getOutputStream)
- gosSource.flush
- var gisSource =
- new ObjectInputStream (guideSocketToSource.getInputStream)
-
- // Throw away whatever comes in
- gisSource.readObject.asInstanceOf[SourceInfo]
- // Sent stopBroadcast signal. listenPort = SourceInfo.StopBroadcast
- gosSource.writeObject(SourceInfo("", SourceInfo.StopBroadcast,
- SourceInfo.UnusedParam, SourceInfo.UnusedParam))
- gosSource.flush
-
- gisSource.close
- gosSource.close
- guideSocketToSource.close
+ var guideSocketToSource: Socket = null
+ var gosSource: ObjectOutputStream = null
+ var gisSource: ObjectInputStream = null
+
+ try {
+ // Connect to the source
+ guideSocketToSource =
+ new Socket (sourceInfo.hostAddress, sourceInfo.listenPort)
+ gosSource =
+ new ObjectOutputStream (guideSocketToSource.getOutputStream)
+ gosSource.flush
+ gisSource =
+ new ObjectInputStream (guideSocketToSource.getInputStream)
+
+ // Throw away whatever comes in
+ gisSource.readObject.asInstanceOf[SourceInfo]
+
+ // Send stopBroadcast signal. listenPort = SourceInfo.StopBroadcast
+ gosSource.writeObject(SourceInfo("", SourceInfo.StopBroadcast,
+ SourceInfo.UnusedParam, SourceInfo.UnusedParam))
+ gosSource.flush
+ } catch {
+ case e: Exception => { }
+ } finally {
+ if (gisSource != null) {
+ gisSource.close
+ }
+ if (gosSource != null) {
+ gosSource.close
+ }
+ if (guideSocketToSource != null) {
+ guideSocketToSource.close
+ }
+ }
}
}
}
@@ -982,7 +997,7 @@ extends BroadcastRecipe with Logging {
if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T]
} else {
- logInfo( "Started reading Broadcasted variable " + uuid)
+ logInfo("Started reading Broadcasted variable " + uuid)
val start = System.nanoTime
val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
@@ -991,7 +1006,7 @@ extends BroadcastRecipe with Logging {
fileIn.close
val time = (System.nanoTime - start) / 1e9
- logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
+ logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}