diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-06 19:37:27 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-06 19:37:27 -0700 |
commit | cbce44304c7c8219a6fee69116bc5bdde83e1018 (patch) | |
tree | 61879589931b2443c6fdf0ee7c772a9a5c8d8127 | |
parent | c37c74919f31ec69e359786bc70d40b7d7319e22 (diff) | |
download | spark-cbce44304c7c8219a6fee69116bc5bdde83e1018.tar.gz spark-cbce44304c7c8219a6fee69116bc5bdde83e1018.tar.bz2 spark-cbce44304c7c8219a6fee69116bc5bdde83e1018.zip |
Added exception handling to 'sendStopBroadcastNotifications'
-rw-r--r-- | src/scala/spark/Broadcast.scala | 65 |
1 files changed, 40 insertions, 25 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 0a49514933..4a13d576b1 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -312,7 +312,7 @@ extends BroadcastRecipe with Logging { // Keep exchaning information until all blocks have been received while (hasBlocks < totalBlocks) { talkOnce - Thread.sleep ( BroadcastBT.ranGen.nextInt ( + Thread.sleep (BroadcastBT.ranGen.nextInt ( BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) + BroadcastBT.MinKnockInterval) } @@ -387,7 +387,7 @@ extends BroadcastRecipe with Logging { } } - Thread.sleep ( BroadcastBT.ranGen.nextInt ( + Thread.sleep (BroadcastBT.ranGen.nextInt ( BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) + BroadcastBT.MinKnockInterval) @@ -403,8 +403,8 @@ extends BroadcastRecipe with Logging { if (gInfo.listenPort == SourceInfo.TxOverGoToHDFS || gInfo.listenPort == SourceInfo.TxNotStartedRetry) { - // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go - // to HDFS anyway when receiveBroadcast returns false + // TODO: SourceInfo.TxNotStartedRetry is not really in use because we go + // to HDFS anyway when receiveBroadcast returns false return false } @@ -699,26 +699,41 @@ extends BroadcastRecipe with Logging { private def sendStopBroadcastNotifications = { listOfSources.synchronized { listOfSources.foreach { sourceInfo => - // Connect to the source - var guideSocketToSource = - new Socket (sourceInfo.hostAddress, sourceInfo.listenPort) - var gosSource = - new ObjectOutputStream (guideSocketToSource.getOutputStream) - gosSource.flush - var gisSource = - new ObjectInputStream (guideSocketToSource.getInputStream) - - // Throw away whatever comes in - gisSource.readObject.asInstanceOf[SourceInfo] - // Sent stopBroadcast signal. listenPort = SourceInfo.StopBroadcast - gosSource.writeObject(SourceInfo("", SourceInfo.StopBroadcast, - SourceInfo.UnusedParam, SourceInfo.UnusedParam)) - gosSource.flush - - gisSource.close - gosSource.close - guideSocketToSource.close + var guideSocketToSource: Socket = null + var gosSource: ObjectOutputStream = null + var gisSource: ObjectInputStream = null + + try { + // Connect to the source + guideSocketToSource = + new Socket (sourceInfo.hostAddress, sourceInfo.listenPort) + gosSource = + new ObjectOutputStream (guideSocketToSource.getOutputStream) + gosSource.flush + gisSource = + new ObjectInputStream (guideSocketToSource.getInputStream) + + // Throw away whatever comes in + gisSource.readObject.asInstanceOf[SourceInfo] + + // Send stopBroadcast signal. listenPort = SourceInfo.StopBroadcast + gosSource.writeObject(SourceInfo("", SourceInfo.StopBroadcast, + SourceInfo.UnusedParam, SourceInfo.UnusedParam)) + gosSource.flush + } catch { + case e: Exception => { } + } finally { + if (gisSource != null) { + gisSource.close + } + if (gosSource != null) { + gosSource.close + } + if (guideSocketToSource != null) { + guideSocketToSource.close + } + } } } } @@ -982,7 +997,7 @@ extends BroadcastRecipe with Logging { if (cachedVal != null) { value_ = cachedVal.asInstanceOf[T] } else { - logInfo( "Started reading Broadcasted variable " + uuid) + logInfo("Started reading Broadcasted variable " + uuid) val start = System.nanoTime val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) @@ -991,7 +1006,7 @@ extends BroadcastRecipe with Logging { fileIn.close val time = (System.nanoTime - start) / 1e9 - logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s") + logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s") } } } |