aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md6
-rw-r--r--core/pom.xml8
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java33
-rw-r--r--docs/scala-programming-guide.md2
-rw-r--r--new-yarn/pom.xml6
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala13
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
-rw-r--r--pom.xml79
-rw-r--r--project/SparkBuild.scala34
-rw-r--r--python/pyspark/context.py3
-rw-r--r--python/pyspark/rdd.py10
-rwxr-xr-xrun-example10
-rwxr-xr-xsbt/sbt21
-rwxr-xr-xspark-class10
-rwxr-xr-xspark-shell19
32 files changed, 227 insertions, 167 deletions
diff --git a/README.md b/README.md
index 80bbe311a9..1550a8b551 100644
--- a/README.md
+++ b/README.md
@@ -54,7 +54,7 @@ versions without YARN, use:
# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly
-For Apache Hadoop 2.0.X, 2.1.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
+For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:
# Apache Hadoop 2.0.5-alpha
@@ -63,10 +63,8 @@ with YARN, also set `SPARK_YARN=true`:
# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly
-When building for Hadoop 2.2.X and newer, you'll need to include the additional `new-yarn` profile:
-
# Apache Hadoop 2.2.X and newer
- $ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn
+ $ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly
When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
diff --git a/core/pom.xml b/core/pom.xml
index cdbaa52731..043f6cf68d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -81,10 +81,6 @@
<artifactId>asm</artifactId>
</dependency>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- <dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>0.3.1</version>
@@ -96,10 +92,6 @@
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-actor_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
</dependency>
<dependency>
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 10fae5af9f..ccffcc356c 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -29,8 +29,7 @@ import akka.pattern.ask
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
-
+import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
- private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val timeout = AkkaUtils.askTimeout
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index c47657f512..037cd1c774 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -125,6 +125,8 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
+
+ override def toString = rdd.toString
}
object JavaRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9e912d3adb..f344804b4c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -245,6 +245,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Return an array that contains all of the elements in a specific partition of this RDD.
+ */
+ def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
+ // This is useful for implementing `take` from other language frontends
+ // like Python where the data is serialized.
+ import scala.collection.JavaConversions._
+ val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
+ res.map(x => new java.util.ArrayList(x.toSeq)).toArray
+ }
+
+ /**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index a659cc06c2..ca42c76928 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -235,10 +235,6 @@ private[spark] object PythonRDD {
file.close()
}
- def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
- implicit val cm : ClassTag[T] = rdd.elementClassTag
- rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
- }
}
private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 4d95efa73a..953755e40d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -23,14 +23,14 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor._
-import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.AkkaUtils
/**
@@ -178,7 +178,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c627dd3806..7b2b1c3327 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -18,19 +18,16 @@
package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
-import java.util.concurrent.TimeUnit
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
-import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
-import akka.remote._
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -537,12 +534,10 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
- val timeoutDuration: FiniteDuration = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
- implicit val timeout = Timeout(timeoutDuration)
- val respFuture = actor ? RequestWebUIPort // ask pattern
- val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
+ val timeout = AkkaUtils.askTimeout
+ val respFuture = actor.ask(RequestWebUIPort)(timeout)
+ val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 3b983c19eb..dbb0cb90f5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,32 +17,28 @@
package org.apache.spark.deploy.master.ui
+import scala.concurrent.Await
import scala.xml.Node
import akka.pattern.ask
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
@@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 65e7a14e7a..4ef762892c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -17,37 +17,33 @@
package org.apache.spark.deploy.master.ui
-import javax.servlet.http.HttpServletRequest
-
+import scala.concurrent.Await
import scala.xml.Node
-import scala.concurrent.Await
import akka.pattern.ask
-import scala.concurrent.duration._
-
+import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployWebUI
+import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a211ce2b42..9ab594b682 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,25 +17,21 @@
package org.apache.spark.deploy.master.ui
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 1a768d501f..0d59048313 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
val runningExecutorTable =
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 6c18a3c245..40d6bdb3fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
-import scala.concurrent.duration._
-
-import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
-import org.eclipse.jetty.server.{Handler, Server}
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone worker.
@@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- implicit val timeout = Timeout(
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f5e8766f6d..7e22c843bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ private val timeout = AkkaUtils.askTimeout
+
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
@@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
- private val timeout = {
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- }
-
def stopExecutors() {
try {
if (driverActor != null) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e05b842476..e1d68ef592 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -18,7 +18,6 @@
package org.apache.spark.storage
import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
@@ -26,15 +25,17 @@ import akka.pattern.ask
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.AkkaUtils
-private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
+private[spark]
+class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 154a3980e9..21022e1cfb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConversions._
+import scala.concurrent.Future
+import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
-import scala.concurrent.duration._
-import scala.concurrent.Future
-
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.Utils
-
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- val akkaTimeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val akkaTimeout = AkkaUtils.askTimeout
initLogging()
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index a5446b3fc3..39f422dd6b 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
val indexPage = new IndexPage(this)
val rddPage = new RDDPage(this)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 74133cef6c..1c8b51b8bc 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import scala.concurrent.duration.{Duration, FiniteDuration}
+
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
@@ -84,4 +86,8 @@ private[spark] object AkkaUtils {
(actorSystem, boundPort)
}
+ /** Returns the default Spark timeout to use for Akka ask operations. */
+ def askTimeout: FiniteDuration = {
+ Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
index 0b51c23f7b..a38329df03 100644
--- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -34,6 +34,8 @@ class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
override def iterator: Iterator[A] = underlying.iterator.asScala
+ override def size: Int = underlying.size
+
override def ++=(xs: TraversableOnce[A]): this.type = {
xs.foreach { this += _ }
this
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 4234f6eac7..79913dc718 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -897,4 +897,37 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
}
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+
+ List[] parts = rdd1.collectPartitions(new int[] {0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartitions(new int[] {0})[0]);
+
+ parts = rdd2.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)),
+ parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)),
+ parts[1]);
+ }
+
}
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 94e8563a8b..56d2a3a4a0 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -363,7 +363,7 @@ res2: Int = 10
# Where to Go from Here
-You can see some [example Spark programs](http://www.spark-project.org/examples.html) on the Spark website.
+You can see some [example Spark programs](http://spark.incubator.apache.org/examples.html) on the Spark website.
In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run-example` script included in Spark; for example:
./run-example org.apache.spark.examples.SparkPi
diff --git a/new-yarn/pom.xml b/new-yarn/pom.xml
index 8a065c6d7d..4cd28f34e3 100644
--- a/new-yarn/pom.xml
+++ b/new-yarn/pom.xml
@@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.9.3</artifactId>
+ <artifactId>spark-yarn_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
@@ -33,7 +33,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -63,7 +63,7 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index c38f33e212..bc31bb2eb0 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.{Utils, AkkaUtils}
@@ -54,17 +54,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ driver = context.actorSelection(driverUrl)
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case x: DisassociatedEvent =>
logInfo("Driver terminated or disconnected! Shutting down.")
driverClosed = true
}
@@ -140,7 +139,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
System.setProperty("spark.driver.host", driverHost)
System.setProperty("spark.driver.port", driverPort.toString)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index dba0f7640e..c27257cda4 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -253,7 +253,7 @@ private[yarn] class YarnAllocationHandler(
numWorkersRunning.decrementAndGet()
} else {
val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/pom.xml b/pom.xml
index fd99fabc15..57e843596f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,12 +104,12 @@
<scala.version>2.10.3</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<mesos.version>0.13.0</mesos.version>
- <akka.version>2.2.3</akka.version>
- <akka.group>com.typesafe.akka</akka.group>
- <protobuf.version>2.4.1</protobuf.version>
+ <akka.group>org.spark-project.akka</akka.group>
+ <akka.version>2.2.3-shaded-protobuf</akka.version>
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>1.0.4</hadoop.version>
+ <protobuf.version>2.4.1</protobuf.version>
<yarn.version>0.23.7</yarn.version>
<hbase.version>0.94.6</hbase.version>
@@ -121,7 +121,7 @@
<repository>
<id>maven-repo</id> <!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution -->
<name>Maven Repository</name>
- <url>http://repo.maven.apache.org/maven2/</url>
+ <url>http://repo.maven.apache.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
@@ -132,7 +132,7 @@
<repository>
<id>jboss-repo</id>
<name>JBoss Repository</name>
- <url>http://repository.jboss.org/nexus/content/repositories/releases/</url>
+ <url>http://repository.jboss.org/nexus/content/repositories/releases</url>
<releases>
<enabled>true</enabled>
</releases>
@@ -143,7 +143,7 @@
<repository>
<id>mqtt-repo</id>
<name>MQTT Repository</name>
- <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
<releases>
<enabled>true</enabled>
</releases>
@@ -200,6 +200,11 @@
<artifactId>asm</artifactId>
<version>4.0</version>
</dependency>
+ <!-- In theory we need not directly depend on protobuf since Spark does not directly
+ use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump
+ the protobuf version up from the one Mesos gives. For now we include this variable
+ to explicitly bump the version when building with YARN. It would be nice to figure
+ out why Maven can't resolve this correctly (like SBT does). -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@@ -461,6 +466,7 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
@@ -716,6 +722,7 @@
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
<hadoop.version>0.23.7</hadoop.version>
+ <protobuf.version>2.5.0</protobuf.version>
<!--<hadoop.version>2.0.5-alpha</hadoop.version> -->
</properties>
@@ -727,7 +734,7 @@
<repository>
<id>maven-root</id>
<name>Maven root repository</name>
- <url>http://repo1.maven.org/maven2/</url>
+ <url>http://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
@@ -743,39 +750,37 @@
</dependencyManagement>
</profile>
- <!-- <profile> -->
- <!-- <id>new-yarn</id> -->
- <!-- <properties> -->
- <!-- <akka.group>org.spark-project</akka.group> -->
- <!-- <akka.version>2.0.5-protobuf-2.5-java-1.5</akka.version> -->
- <!-- <hadoop.major.version>2</hadoop.major.version> -->
- <!-- <hadoop.version>2.2.0</hadoop.version> -->
- <!-- <protobuf.version>2.5.0</protobuf.version> -->
- <!-- </properties> -->
+ <profile>
+ <id>new-yarn</id>
+ <properties>
+ <hadoop.major.version>2</hadoop.major.version>
+ <hadoop.version>2.2.0</hadoop.version>
+ <protobuf.version>2.5.0</protobuf.version>
+ </properties>
- <!-- <modules> -->
- <!-- <module>new-yarn</module> -->
- <!-- </modules> -->
+ <modules>
+ <module>new-yarn</module>
+ </modules>
- <!-- <repositories> -->
- <!-- <repository> -->
- <!-- <id>maven-root</id> -->
- <!-- <name>Maven root repository</name> -->
- <!-- <url>http://repo1.maven.org/maven2/</url> -->
- <!-- <releases> -->
- <!-- <enabled>true</enabled> -->
- <!-- </releases> -->
- <!-- <snapshots> -->
- <!-- <enabled>false</enabled> -->
- <!-- </snapshots> -->
- <!-- </repository> -->
- <!-- </repositories> -->
+ <repositories>
+ <repository>
+ <id>maven-root</id>
+ <name>Maven root repository</name>
+ <url>http://repo1.maven.org/maven2</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
- <!-- <dependencyManagement> -->
- <!-- <dependencies> -->
- <!-- </dependencies> -->
- <!-- </dependencyManagement> -->
- <!-- </profile> -->
+ <dependencyManagement>
+ <dependencies>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
<profile>
<id>repl-bin</id>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 441dcc18fb..ab96cfa18b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -84,21 +84,10 @@ object SparkBuild extends Build {
case Some(v) => v.toBoolean
}
- if (isNewHadoop && isYarnEnabled) {
- println( """Yarn with Hadoop version 2.2.x is not yet expected to work.
- Please set env SPARK_HADOOP_VERSION to appropriate version or set SPARK_YARN to false.""")
- throw new Exception("Yarn with Hadoop version 2.2.x is not yet expected to work.")
- }
-
- // Build against a protobuf-2.5 compatible Akka if Hadoop 2 is used.
- // lazy val protobufVersion = if (isNewHadoop) "2.5.0" else "2.4.1"
- // lazy val akkaVersion = if (isNewHadoop) "2.0.5-protobuf-2.5-java-1.5" else "2.0.5"
- // lazy val akkaGroup = if (isNewHadoop) "org.spark-project" else "com.typesafe.akka"
-
// Conditionally include the yarn sub-project
- //lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core)
+ lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core)
- lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
+ //lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
@@ -188,6 +177,8 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
+ /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
+ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.9" % "test",
@@ -235,9 +226,8 @@ object SparkBuild extends Build {
"com.ning" % "compress-lzf" % "0.8.4",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
- "com.google.protobuf" % "protobuf-java" % "2.4.1",
- "com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty),
+ "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty),
+ "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty),
"net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
@@ -312,16 +302,16 @@ object SparkBuild extends Build {
),
libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
- "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
+ "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("net.sf.jopt-simple", "jopt-simple")
excludeAll(excludeNetty),
- "org.eclipse.paho" % "mqtt-client" % "0.4.0",
- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty)
+ "org.eclipse.paho" % "mqtt-client" % "0.4.0",
+ "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
+ "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
)
)
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index cbd41e58c4..0604f6836c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -43,7 +43,6 @@ class SparkContext(object):
_gateway = None
_jvm = None
_writeToFile = None
- _takePartition = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
@@ -134,8 +133,6 @@ class SparkContext(object):
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = \
SparkContext._jvm.PythonRDD.writeToFile
- SparkContext._takePartition = \
- SparkContext._jvm.PythonRDD.takePartition
if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 61720dcf1a..f87923e6fa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -54,6 +54,9 @@ class RDD(object):
self.ctx = ctx
self._jrdd_deserializer = jrdd_deserializer
+ def __repr__(self):
+ return self._jrdd.toString()
+
@property
def context(self):
"""
@@ -576,8 +579,13 @@ class RDD(object):
# Take only up to num elements from each partition we try
mapped = self.mapPartitions(takeUpToNum)
items = []
+ # TODO(shivaram): Similar to the scala implementation, update the take
+ # method to scan multiple splits based on an estimate of how many elements
+ # we have per-split.
for partition in range(mapped._jrdd.splits().size()):
- iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
+ partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
+ partitionsToTake[0] = partition
+ iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
items.extend(mapped._collect_iterator_through_file(iterator))
if len(items) >= num:
break
diff --git a/run-example b/run-example
index feade6589a..a78192d31d 100755
--- a/run-example
+++ b/run-example
@@ -17,6 +17,11 @@
# limitations under the License.
#
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
+
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
@@ -59,6 +64,11 @@ fi
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
+if $cygwin; then
+ CLASSPATH=`cygpath -wp $CLASSPATH`
+ export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
+fi
+
# Find java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
diff --git a/sbt/sbt b/sbt/sbt
index c31a0280ff..5942280585 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -17,12 +17,27 @@
# limitations under the License.
#
-EXTRA_ARGS=""
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
+
+EXTRA_ARGS="-Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m"
if [ "$MESOS_HOME" != "" ]; then
- EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java"
+ EXTRA_ARGS="$EXTRA_ARGS -Djava.library.path=$MESOS_HOME/lib/java"
fi
export SPARK_HOME=$(cd "$(dirname $0)/.." 2>&1 >/dev/null ; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
-java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS $SBT_OPTS -jar "$SPARK_HOME"/sbt/sbt-launch-*.jar "$@"
+SBT_JAR="$SPARK_HOME"/sbt/sbt-launch-*.jar
+if $cygwin; then
+ SBT_JAR=`cygpath -w $SBT_JAR`
+ export SPARK_HOME=`cygpath -w $SPARK_HOME`
+ EXTRA_ARGS="$EXTRA_ARGS -Djline.terminal=jline.UnixTerminal -Dsbt.cygwin=true"
+ stty -icanon min 1 -echo > /dev/null 2>&1
+ java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@"
+ stty icanon echo > /dev/null 2>&1
+else
+ java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@"
+fi \ No newline at end of file
diff --git a/spark-class b/spark-class
index 4fa6fb864e..4eb95a9ba2 100755
--- a/spark-class
+++ b/spark-class
@@ -17,6 +17,11 @@
# limitations under the License.
#
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
+
SCALA_VERSION=2.10
# Figure out where the Scala framework is installed
@@ -125,6 +130,11 @@ fi
# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_TOOLS_JAR:$CLASSPATH"
+
+if $cygwin; then
+ CLASSPATH=`cygpath -wp $CLASSPATH`
+ export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR`
+fi
export CLASSPATH
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
diff --git a/spark-shell b/spark-shell
index 9608bd3f30..d20af0fb39 100755
--- a/spark-shell
+++ b/spark-shell
@@ -23,7 +23,11 @@
# if those two env vars are set in spark-env.sh but MASTER is not.
# Options:
# -c <cores> Set the number of cores for REPL to use
-#
+
+cygwin=false
+case "`uname`" in
+ CYGWIN*) cygwin=true;;
+esac
# Enter posix mode for bash
set -o posix
@@ -79,7 +83,18 @@ if [[ ! $? ]]; then
saved_stty=""
fi
-$FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
+if $cygwin; then
+ # Workaround for issue involving JLine and Cygwin
+ # (see http://sourceforge.net/p/jline/bugs/40/).
+ # If you're using the Mintty terminal emulator in Cygwin, may need to set the
+ # "Backspace sends ^H" setting in "Keys" section of the Mintty options
+ # (see https://github.com/sbt/sbt/issues/562).
+ stty -icanon min 1 -echo > /dev/null 2>&1
+ $FWDIR/spark-class -Djline.terminal=unix $OPTIONS org.apache.spark.repl.Main "$@"
+ stty icanon echo > /dev/null 2>&1
+else
+ $FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
+fi
# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.