aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/ParallelShuffleFetcher.scala68
-rw-r--r--project/build/SparkProject.scala1
2 files changed, 24 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/ParallelShuffleFetcher.scala b/core/src/main/scala/spark/ParallelShuffleFetcher.scala
index b9e4034e1f..95dfb01aac 100644
--- a/core/src/main/scala/spark/ParallelShuffleFetcher.scala
+++ b/core/src/main/scala/spark/ParallelShuffleFetcher.scala
@@ -12,26 +12,9 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import org.eclipse.jetty.client.ContentExchange
-import org.eclipse.jetty.client.HttpClient
-import org.eclipse.jetty.client.HttpExchange
-import org.eclipse.jetty.util.thread.QueuedThreadPool
-
class ParallelShuffleFetcher extends ShuffleFetcher with Logging {
val parallelFetches = System.getProperty("spark.parallel.fetches", "3").toInt
- val httpClient = createHttpClient()
-
- private def createHttpClient(): HttpClient = {
- val client = new HttpClient
- client.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
- val threadPool = new QueuedThreadPool
- threadPool.setDaemon(true)
- client.setThreadPool(threadPool);
- client.setTimeout(30000);
- client.start();
- client
- }
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
logInfo("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
@@ -70,28 +53,29 @@ class ParallelShuffleFetcher extends ShuffleFetcher with Logging {
return
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, reduceId)
logInfo("Starting HTTP request for " + url)
- var exception: Throwable = null
- val exchange = new ContentExchange(true) {
- override def onException(e: Throwable) { exception = e }
- }
- exchange.setURL(url)
- httpClient.send(exchange)
- val status = exchange.waitForDone()
- logInfo("Finished HTTP request for " + url + ", status = " + status)
- status match {
- case HttpExchange.STATUS_COMPLETED =>
- resultQueue.put(exchange.getResponseContentBytes())
- case HttpExchange.STATUS_EXCEPTED =>
- logError("Fetch failed from " + url + " with exception", exception)
- failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, exception))
- return
- case HttpExchange.STATUS_EXPIRED =>
- logError("Fetch failed from " + url + " with expired status (timeout)")
- failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, null))
- return
- case other =>
- logError("Fetch failed from " + url + " with unknown Jetty status " + other)
- failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, null))
+ try {
+ val conn = new URL(url).openConnection()
+ conn.connect()
+ val len = conn.getContentLength()
+ if (len == -1)
+ throw new SparkException("Content length was not specified by server")
+ val buf = new Array[Byte](len)
+ val in = conn.getInputStream()
+ var pos = 0
+ while (pos < len) {
+ val n = in.read(buf, pos, len-pos)
+ if (n == -1)
+ throw new SparkException("EOF before reading the expected " + len + " bytes")
+ else
+ pos += n
+ }
+ // Done reading everything
+ resultQueue.put(buf)
+ in.close()
+ } catch {
+ case e: Exception =>
+ logError("Fetch failed from " + url, e)
+ failure.set(new FetchFailedException(serverUri, shuffleId, i, reduceId, e))
return
}
}
@@ -121,14 +105,10 @@ class ParallelShuffleFetcher extends ShuffleFetcher with Logging {
resultsDone += 1
//logInfo("Results done = " + resultsDone)
}
- } catch {case e: InterruptedException => {}}
+ } catch { case e: InterruptedException => {} }
}
if (failure.get != null) {
throw failure.get
}
}
-
- override def stop() {
- httpClient.stop()
- }
}
diff --git a/project/build/SparkProject.scala b/project/build/SparkProject.scala
index 908f2f9b79..58908731a3 100644
--- a/project/build/SparkProject.scala
+++ b/project/build/SparkProject.scala
@@ -20,7 +20,6 @@ class SparkProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
override def compileOptions = super.compileOptions ++ Seq(Unchecked)
lazy val jettyServer = "org.eclipse.jetty" % "jetty-server" % "7.4.2.v20110526"
- lazy val jettyClient = "org.eclipse.jetty" % "jetty-client" % "7.4.2.v20110526"
override def packageDocsJar = defaultJarPath("-javadoc.jar")
override def packageSrcJar= defaultJarPath("-sources.jar")