aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala64
-rwxr-xr-xrun7
2 files changed, 56 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 0c6bdb1559..a79fce8697 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -188,6 +188,38 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} )
}
+ // MUST be called within selector loop - else deadlock.
+ private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
+ try {
+ key.interestOps(0)
+ } catch {
+ // ignore exceptions
+ case e: Exception => logDebug("Ignoring exception", e)
+ }
+
+ val conn = connectionsByKey.getOrElse(key, null)
+ if (conn == null) return
+
+ // Pushing to connect threadpool
+ handleConnectExecutor.execute(new Runnable {
+ override def run() {
+ try {
+ conn.callOnExceptionCallback(e)
+ } catch {
+ // ignore exceptions
+ case e: Exception => logDebug("Ignoring exception", e)
+ }
+ try {
+ conn.close()
+ } catch {
+ // ignore exceptions
+ case e: Exception => logDebug("Ignoring exception", e)
+ }
+ }
+ })
+ }
+
+
def run() {
try {
while(!selectorThread.isInterrupted) {
@@ -235,18 +267,26 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
while (selectedKeys.hasNext()) {
val key = selectedKeys.next
selectedKeys.remove()
- if (key.isValid) {
- if (key.isAcceptable) {
- acceptConnection(key)
- } else
- if (key.isConnectable) {
- triggerConnect(key)
- } else
- if (key.isReadable) {
- triggerRead(key)
- } else
- if (key.isWritable) {
- triggerWrite(key)
+ try {
+ if (key.isValid) {
+ if (key.isAcceptable) {
+ acceptConnection(key)
+ } else
+ if (key.isConnectable) {
+ triggerConnect(key)
+ } else
+ if (key.isReadable) {
+ triggerRead(key)
+ } else
+ if (key.isWritable) {
+ triggerWrite(key)
+ }
+ }
+ } catch {
+ // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
+ case e: CancelledKeyException => {
+ logInfo("key already cancelled ? " + key, e)
+ triggerForceCloseByException(key, e)
}
}
}
diff --git a/run b/run
index 756f8703f2..0a58ac4a36 100755
--- a/run
+++ b/run
@@ -95,6 +95,7 @@ export JAVA_OPTS
CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl"
+REPL_BIN_DIR="$FWDIR/repl-bin"
EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
STREAMING_DIR="$FWDIR/streaming"
@@ -125,8 +126,8 @@ if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
fi
CLASSPATH+=":$REPL_DIR/lib/*"
-if [ -e repl-bin/target ]; then
- for jar in `find "repl-bin/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
+if [ -e $REPL_BIN_DIR/target ]; then
+ for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
CLASSPATH+=":$jar"
done
fi
@@ -134,7 +135,6 @@ CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar"
done
-export CLASSPATH # Needed for spark-shell
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
@@ -163,4 +163,5 @@ else
EXTRA_ARGS="$JAVA_OPTS"
fi
+export CLASSPATH # Needed for spark-shell
exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"