diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-11 22:12:36 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-11 22:12:36 -0400 |
commit | d05fea24f3c697e3b62f10fb794e9d51ef4441ea (patch) | |
tree | adc6a1aba79fafb8020ddfa902069b5814d4f54f | |
parent | 25c3a7781cf4d1fe7a476ddf997afee2a2d6a0dd (diff) | |
download | spark-d05fea24f3c697e3b62f10fb794e9d51ef4441ea.tar.gz spark-d05fea24f3c697e3b62f10fb794e9d51ef4441ea.tar.bz2 spark-d05fea24f3c697e3b62f10fb794e9d51ef4441ea.zip |
Simplified parallel shuffle fetcher to use URLConnection
-rw-r--r-- | core/src/main/scala/spark/ParallelShuffleFetcher.scala | 68 | ||||
-rw-r--r-- | project/build/SparkProject.scala | 1 |
2 files changed, 24 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/ParallelShuffleFetcher.scala b/core/src/main/scala/spark/ParallelShuffleFetcher.scala index b9e4034e1f..95dfb01aac 100644 --- a/core/src/main/scala/spark/ParallelShuffleFetcher.scala +++ b/core/src/main/scala/spark/ParallelShuffleFetcher.scala @@ -12,26 +12,9 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import org.eclipse.jetty.client.ContentExchange -import org.eclipse.jetty.client.HttpClient -import org.eclipse.jetty.client.HttpExchange -import org.eclipse.jetty.util.thread.QueuedThreadPool - class ParallelShuffleFetcher extends ShuffleFetcher with Logging { val parallelFetches = System.getProperty("spark.parallel.fetches", "3").toInt - val httpClient = createHttpClient() - - private def createHttpClient(): HttpClient = { - val client = new HttpClient - client.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); - val threadPool = new QueuedThreadPool - threadPool.setDaemon(true) - client.setThreadPool(threadPool); - client.setTimeout(30000); - client.start(); - client - } def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) { logInfo("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) @@ -70,28 +53,29 @@ class ParallelShuffleFetcher extends ShuffleFetcher with Logging { return val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, reduceId) logInfo("Starting HTTP request for " + url) - var exception: Throwable = null - val exchange = new ContentExchange(true) { - override def onException(e: Throwable) { exception = e } - } - exchange.setURL(url) - httpClient.send(exchange) - val status = exchange.waitForDone() - logInfo("Finished HTTP request for " + url + ", status = " + status) - status match { - case HttpExchange.STATUS_COMPLETED => - resultQueue.put(exchange.getResponseContentBytes()) - case HttpExchange.STATUS_EXCEPTED => - logError("Fetch failed from " + url + " with exception", exception) - failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, exception)) - return - case HttpExchange.STATUS_EXPIRED => - logError("Fetch failed from " + url + " with expired status (timeout)") - failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, null)) - return - case other => - logError("Fetch failed from " + url + " with unknown Jetty status " + other) - failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, null)) + try { + val conn = new URL(url).openConnection() + conn.connect() + val len = conn.getContentLength() + if (len == -1) + throw new SparkException("Content length was not specified by server") + val buf = new Array[Byte](len) + val in = conn.getInputStream() + var pos = 0 + while (pos < len) { + val n = in.read(buf, pos, len-pos) + if (n == -1) + throw new SparkException("EOF before reading the expected " + len + " bytes") + else + pos += n + } + // Done reading everything + resultQueue.put(buf) + in.close() + } catch { + case e: Exception => + logError("Fetch failed from " + url, e) + failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, e)) return } } @@ -121,14 +105,10 @@ class ParallelShuffleFetcher extends ShuffleFetcher with Logging { resultsDone += 1 //logInfo("Results done = " + resultsDone) } - } catch {case e: InterruptedException => {}} + } catch { case e: InterruptedException => {} } } if (failure.get != null) { throw failure.get } } - - override def stop() { - httpClient.stop() - } } diff --git a/project/build/SparkProject.scala b/project/build/SparkProject.scala index 908f2f9b79..58908731a3 100644 --- a/project/build/SparkProject.scala +++ b/project/build/SparkProject.scala @@ -20,7 +20,6 @@ class SparkProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje override def compileOptions = super.compileOptions ++ Seq(Unchecked) lazy val jettyServer = "org.eclipse.jetty" % "jetty-server" % "7.4.2.v20110526" - lazy val jettyClient = "org.eclipse.jetty" % "jetty-client" % "7.4.2.v20110526" override def packageDocsJar = defaultJarPath("-javadoc.jar") override def packageSrcJar= defaultJarPath("-sources.jar") |