From e46d547ccd43c0fb3a79a30a7c43a78afba6f93f Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Tue, 30 Apr 2013 16:15:56 +0530 Subject: Fix issues reported by Reynold --- .../scala/spark/network/ConnectionManager.scala | 64 ++++++++++++++++++---- run | 7 ++- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 0c6bdb1559..a79fce8697 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } ) } + // MUST be called within selector loop - else deadlock. + private def triggerForceCloseByException(key: SelectionKey, e: Exception) { + try { + key.interestOps(0) + } catch { + // ignore exceptions + case e: Exception => logDebug("Ignoring exception", e) + } + + val conn = connectionsByKey.getOrElse(key, null) + if (conn == null) return + + // Pushing to connect threadpool + handleConnectExecutor.execute(new Runnable { + override def run() { + try { + conn.callOnExceptionCallback(e) + } catch { + // ignore exceptions + case e: Exception => logDebug("Ignoring exception", e) + } + try { + conn.close() + } catch { + // ignore exceptions + case e: Exception => logDebug("Ignoring exception", e) + } + } + }) + } + + def run() { try { while(!selectorThread.isInterrupted) { @@ -235,18 +267,26 @@ private[spark] class ConnectionManager(port: Int) extends Logging { while (selectedKeys.hasNext()) { val key = selectedKeys.next selectedKeys.remove() - if (key.isValid) { - if (key.isAcceptable) { - acceptConnection(key) - } else - if (key.isConnectable) { - triggerConnect(key) - } else - if (key.isReadable) { - triggerRead(key) - } else - if (key.isWritable) { - triggerWrite(key) + try { + if (key.isValid) { + if (key.isAcceptable) { + acceptConnection(key) + } else + if (key.isConnectable) { + triggerConnect(key) + } else + if (key.isReadable) { + triggerRead(key) + } else + if (key.isWritable) { + triggerWrite(key) + } + } + } catch { + // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) } } } diff --git a/run b/run index 756f8703f2..0a58ac4a36 100755 --- a/run +++ b/run @@ -95,6 +95,7 @@ export JAVA_OPTS CORE_DIR="$FWDIR/core" REPL_DIR="$FWDIR/repl" +REPL_BIN_DIR="$FWDIR/repl-bin" EXAMPLES_DIR="$FWDIR/examples" BAGEL_DIR="$FWDIR/bagel" STREAMING_DIR="$FWDIR/streaming" @@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then CLASSPATH+=":$FWDIR/lib_managed/bundles/*" fi CLASSPATH+=":$REPL_DIR/lib/*" -if [ -e repl-bin/target ]; then - for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do +if [ -e $REPL_BIN_DIR/target ]; then + for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do CLASSPATH+=":$jar" done fi @@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do CLASSPATH+=":$jar" done -export CLASSPATH # Needed for spark-shell # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack # to avoid the -sources and -doc packages that are built by publish-local. @@ -163,4 +163,5 @@ else EXTRA_ARGS="$JAVA_OPTS" fi +export CLASSPATH # Needed for spark-shell exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" -- cgit v1.2.3