aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2015-07-22 15:28:09 -0700
committerMatei Zaharia <matei@databricks.com>2015-07-22 15:28:09 -0700
commitfe26584a1f5b472fb2e87aa7259aec822a619a3b (patch)
treed568c3aeda422e91d2b3d1a9335605da55be73fa
parent1aca9c13c144fa336af6afcfa666128bf77c49d4 (diff)
downloadspark-fe26584a1f5b472fb2e87aa7259aec822a619a3b.tar.gz
spark-fe26584a1f5b472fb2e87aa7259aec822a619a3b.tar.bz2
spark-fe26584a1f5b472fb2e87aa7259aec822a619a3b.zip
[SPARK-9244] Increase some memory defaults
There are a few memory limits that people hit often and that we could make higher, especially now that memory sizes have grown. - spark.akka.frameSize: This defaults at 10 but is often hit for map output statuses in large shuffles. This memory is not fully allocated up-front, so we can just make this larger and still not affect jobs that never sent a status that large. We increase it to 128. - spark.executor.memory: Defaults at 512m, which is really small. We increase it to 1g. Author: Matei Zaharia <matei@databricks.com> Closes #7586 from mateiz/configs and squashes the following commits: ce0038a [Matei Zaharia] [SPARK-9244] Increase some memory defaults
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala2
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala14
-rw-r--r--docs/configuration.md16
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala2
-rw-r--r--python/pyspark/tests.py6
-rw-r--r--repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala10
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala4
27 files changed, 78 insertions, 80 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d00c012d80..4976e5eb49 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -471,7 +471,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
- .getOrElse(512)
+ .getOrElse(1024)
// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index c179833e5b..78e7ddc27d 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -128,7 +128,7 @@ private[spark] object AkkaUtils extends Logging {
/** Returns the configured max frame size for Akka messages in bytes. */
def maxFrameSizeBytes(conf: SparkConf): Int = {
- val frameSizeInMB = conf.getInt("spark.akka.frameSize", 10)
+ val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
throw new IllegalArgumentException(
s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 1b04a3b1cf..e948ca3347 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1783,7 +1783,7 @@ public class JavaAPISuite implements Serializable {
// Stop the context created in setUp() and start a local-cluster one, to force usage of the
// assembly.
sc.stop();
- JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,512]", "JavaAPISuite");
+ JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,1024]", "JavaAPISuite");
try {
JavaRDD<Integer> rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
JavaRDD<Optional<Integer>> rdd2 = rdd1.map(
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 501fe186bf..26858ef277 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -292,7 +292,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
sc.stop()
val conf2 = new SparkConf()
- .setMaster("local-cluster[2, 1, 512]")
+ .setMaster("local-cluster[2, 1, 1024]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
@@ -370,7 +370,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
sc.stop()
val conf2 = new SparkConf()
- .setMaster("local-cluster[2, 1, 512]")
+ .setMaster("local-cluster[2, 1, 1024]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 2300bcff4f..600c1403b0 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -29,7 +29,7 @@ class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {
class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContext {
- val clusterUrl = "local-cluster[2,1,512]"
+ val clusterUrl = "local-cluster[2,1,1024]"
test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
@@ -40,7 +40,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
val numSlaves = 3
val numPartitions = 10
- sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
+ sc = new SparkContext("local-cluster[%s,1,1024]".format(numSlaves), "test")
val data = sc.parallelize(1 to 100, numPartitions).
map(x => throw new NotSerializableExn(new NotSerializableClass))
intercept[SparkException] {
@@ -50,16 +50,16 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("local-cluster format") {
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,1024]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
- sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
+ sc = new SparkContext("local-cluster[2 , 1 , 1024]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
- sc = new SparkContext("local-cluster[2, 1, 512]", "test")
+ sc = new SparkContext("local-cluster[2, 1, 1024]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
- sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
+ sc = new SparkContext("local-cluster[ 2, 1, 1024 ]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
resetSparkContext()
}
@@ -276,7 +276,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
DistributedSuite.amMaster = true
// Using more than two nodes so we don't have a symmetric communication pattern and might
// cache a partially correct list of peers.
- sc = new SparkContext("local-cluster[3,1,512]", "test")
+ sc = new SparkContext("local-cluster[3,1,1024]", "test")
for (i <- 1 to 3) {
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
@@ -294,7 +294,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
test("unpersist RDDs") {
DistributedSuite.amMaster = true
- sc = new SparkContext("local-cluster[3,1,512]", "test")
+ sc = new SparkContext("local-cluster[3,1,1024]", "test")
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
data.count
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index b2262033ca..454b7e607a 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -29,7 +29,7 @@ class DriverSuite extends SparkFunSuite with Timeouts {
ignore("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- val masters = Table("master", "local", "local-cluster[2,1,512]")
+ val masters = Table("master", "local", "local-cluster[2,1,1024]")
forAll(masters) { (master: String) =>
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 140012226f..c38d70252a 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -51,7 +51,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test ensures that the external shuffle service is actually in use for the other tests.
test("using external shuffle service") {
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 876418aa13..1255e71af6 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -139,7 +139,7 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}
test("Distributing files on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
val result = sc.parallelize(testData).reduceByKey {
@@ -153,7 +153,7 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}
test ("Dynamically adding JARS on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
@@ -164,7 +164,7 @@ class FileServerSuite extends SparkFunSuite with LocalSparkContext {
}
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
- sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 340a9e3271..1168eb0b80 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -64,7 +64,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
test("cluster mode, FIFO scheduler") {
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
@@ -75,7 +75,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
conf.set("spark.scheduler.allocation.file", xmlPath)
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index b68102bfb9..d91b799ecf 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -47,7 +47,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("shuffle non-zero block size") {
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val NUM_BLOCKS = 3
val a = sc.parallelize(1 to 10, 2)
@@ -73,7 +73,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("shuffle serializer") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(x, new NonJavaSerializableClass(x * 2))
@@ -89,7 +89,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("zero sized blocks") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
// 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
val NUM_BLOCKS = 201
@@ -116,7 +116,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("zero sized blocks without kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
// 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys
val NUM_BLOCKS = 201
@@ -141,7 +141,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("shuffle on mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -154,7 +154,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("sorting on mutable pairs") {
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
@@ -168,7 +168,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("cogroup using mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
@@ -195,7 +195,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("subtract mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
@@ -210,7 +210,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("sort with Java non serializable class - Kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- sc = new SparkContext("local-cluster[2,1,512]", "test", myConf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
@@ -223,7 +223,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("sort with Java non serializable class - Java") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
(new NonJavaSerializableClass(x), x)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index dba46f101c..e5a14a69ef 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -123,7 +123,7 @@ class SparkContextSchedulerCreationSuite
}
test("local-cluster") {
- createTaskScheduler("local-cluster[3, 14, 512]").backend match {
+ createTaskScheduler("local-cluster[3, 14, 1024]").backend match {
case s: SparkDeploySchedulerBackend => // OK
case _ => fail()
}
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index c054c71807..48e74f06f7 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -69,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val conf = httpConf.clone
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.broadcast.compress", "true")
- sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
+ sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
@@ -97,7 +97,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val conf = torrentConf.clone
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.broadcast.compress", "true")
- sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
+ sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
@@ -125,7 +125,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
test("Test Lazy Broadcast variables with TorrentBroadcast") {
val numSlaves = 2
val conf = torrentConf.clone
- sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
+ sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val rdd = sc.parallelize(1 to numSlaves)
val results = new DummyBroadcastClass(rdd).doSomething()
@@ -308,7 +308,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
sc = if (distributed) {
val _sc =
- new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
+ new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
_sc
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index ddc92814c0..cbd2aee10c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -33,7 +33,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
private val WAIT_TIMEOUT_MILLIS = 10000
test("verify that correct log urls get propagated from workers") {
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ sc = new SparkContext("local-cluster[2,1,1024]", "test")
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)
@@ -66,7 +66,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
}
val conf = new MySparkConf().set(
"spark.extraListeners", classOf[SaveExecutorInfo].getName)
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 343d28eef8..aa78bfe309 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -337,7 +337,7 @@ class SparkSubmitSuite
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
- "--master", "local-cluster[2,1,512]",
+ "--master", "local-cluster[2,1,1024]",
"--jars", jarsString,
unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
runSparkSubmit(args)
@@ -352,7 +352,7 @@ class SparkSubmitSuite
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
- "--master", "local-cluster[2,1,512]",
+ "--master", "local-cluster[2,1,1024]",
"--packages", Seq(main, dep).mkString(","),
"--repositories", repo,
"--conf", "spark.ui.enabled=false",
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 3414569115..eef6aafa62 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -26,7 +26,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
val conf = new SparkConf
conf.set("spark.akka.frameSize", "1")
conf.set("spark.default.parallelism", "1")
- sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
+ sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index f681f21b62..5cb2d4225d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -180,7 +180,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
- val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
+ val sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 4e3defb43a..103fc19369 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -102,7 +102,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter {
fileSystem.mkdirs(logDirPath)
val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
- val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
+ val sc = new SparkContext("local-cluster[2,1,1024]", "Test replay", conf)
// Run a few jobs
sc.parallelize(1 to 100, 1).count()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index d97fba0097..d1e23ed527 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -34,7 +34,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
val WAIT_TIMEOUT_MILLIS = 10000
before {
- sc = new SparkContext("local-cluster[2,1,512]", "SparkListenerSuite")
+ sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite")
}
test("SparkListener sends executor added message") {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index 353b97469c..935a091f14 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -35,7 +35,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
conf.setJars(List(jar.getPath))
- val sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ val sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val original = Thread.currentThread.getContextClassLoader
val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
SparkEnv.get.serializer.setDefaultClassLoader(loader)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 79eba61a87..9c362f0de7 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -244,7 +244,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
conf.set("spark.shuffle.memoryFraction", "0.001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
// reduceByKey - should spill ~8 times
val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
@@ -292,7 +292,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[String]
val collisionPairs = Seq(
@@ -341,7 +341,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
@@ -366,7 +366,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
(1 to 100000).foreach { i => map.insert(i, i) }
@@ -383,7 +383,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.memoryFraction", "0.001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
map.insertAll((1 to 100000).iterator.map(i => (i, i)))
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 9cefa612f5..986cd8623d 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -176,7 +176,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
def testSpillingInLocalCluster(conf: SparkConf) {
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
// reduceByKey - should spill ~8 times
val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
@@ -254,7 +254,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
- sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
// reduceByKey - should spill ~4 times per executor
val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
@@ -554,7 +554,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(true, false)
conf.set("spark.shuffle.memoryFraction", "0.001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
@@ -611,7 +611,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(true, false)
conf.set("spark.shuffle.memoryFraction", "0.0001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
val sorter = new ExternalSorter[FixedHashObject, Int, Int](Some(agg), None, None, None)
@@ -634,7 +634,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(true, false)
conf.set("spark.shuffle.memoryFraction", "0.001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
def mergeValue(buffer: ArrayBuffer[Int], i: Int): ArrayBuffer[Int] = buffer += i
@@ -658,7 +658,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(true, false)
conf.set("spark.shuffle.memoryFraction", "0.001")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
@@ -695,7 +695,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
def sortWithoutBreakingSortingContracts(conf: SparkConf) {
conf.set("spark.shuffle.memoryFraction", "0.01")
conf.set("spark.shuffle.manager", "sort")
- sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
// Using wrongOrdering to show integer overflow introduced exception.
val rand = new Random(100L)
diff --git a/docs/configuration.md b/docs/configuration.md
index 8a186ee51c..fea259204a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -31,7 +31,6 @@ which can help detect bugs that only exist when we run in a distributed context.
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
- .set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
{% endhighlight %}
@@ -84,7 +83,7 @@ Running `./bin/spark-submit --help` will show the entire list of these options.
each line consists of a key and a value separated by whitespace. For example:
spark.master spark://5.6.7.8:7077
- spark.executor.memory 512m
+ spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
@@ -150,10 +149,9 @@ of the most common options to set are:
</tr>
<tr>
<td><code>spark.executor.memory</code></td>
- <td>512m</td>
+ <td>1g</td>
<td>
- Amount of memory to use per executor process, in the same format as JVM memory strings
- (e.g. <code>512m</code>, <code>2g</code>).
+ Amount of memory to use per executor process (e.g. <code>2g</code>, <code>8g</code>).
</td>
</tr>
<tr>
@@ -886,11 +884,11 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.akka.frameSize</code></td>
- <td>10</td>
+ <td>128</td>
<td>
- Maximum message size to allow in "control plane" communication (for serialized tasks and task
- results), in MB. Increase this if your tasks need to send back large results to the driver
- (e.g. using <code>collect()</code> on a large dataset).
+ Maximum message size to allow in "control plane" communication; generally only applies to map
+ output size information sent between executors and the driver. Increase this if you are running
+ jobs with many thousands of map and reduce tasks and see messages about the frame size.
</td>
</tr>
<tr>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
index 5e9101cdd3..525ab68c79 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
@@ -26,7 +26,7 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
override def beforeAll() {
val conf = new SparkConf()
- .setMaster("local-cluster[2, 1, 512]")
+ .setMaster("local-cluster[2, 1, 1024]")
.setAppName("test-cluster")
.set("spark.akka.frameSize", "1") // set to 1MB to detect direct serialization of data
sc = new SparkContext(conf)
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 5be9937cb0..8bfed074c9 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1823,7 +1823,7 @@ class SparkSubmitTests(unittest.TestCase):
| return x + 1
""")
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
- "local-cluster[1,1,512]", script],
+ "local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
@@ -1857,7 +1857,7 @@ class SparkSubmitTests(unittest.TestCase):
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, "--master",
- "local-cluster[1,1,512]", script], stdout=subprocess.PIPE)
+ "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
@@ -1876,7 +1876,7 @@ class SparkSubmitTests(unittest.TestCase):
# this will fail if you have different spark.executor.memory
# in conf/spark-defaults.conf
proc = subprocess.Popen(
- [self.sparkSubmit, "--master", "local-cluster[1,1,512]", script],
+ [self.sparkSubmit, "--master", "local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index f150fec7db..5674dcd669 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -211,7 +211,7 @@ class ReplSuite extends SparkFunSuite {
}
test("local-cluster mode") {
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|var v = 7
|def getV() = v
@@ -233,7 +233,7 @@ class ReplSuite extends SparkFunSuite {
}
test("SPARK-1199 two instances of same class don't type check.") {
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|case class Sum(exp: String, exp2: String)
|val a = Sum("A", "B")
@@ -256,7 +256,7 @@ class ReplSuite extends SparkFunSuite {
test("SPARK-2576 importing SQLContext.implicits._") {
// We need to use local-cluster to test this case.
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
@@ -325,9 +325,9 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
-
+
test("collecting objects of class defined in repl - shuffling") {
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|case class Foo(i: Int)
|val list = List((1, Foo(1)), (1, Foo(2)))
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index e1cee97de3..bf8997998e 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -209,7 +209,7 @@ class ReplSuite extends SparkFunSuite {
}
test("local-cluster mode") {
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|var v = 7
|def getV() = v
@@ -231,7 +231,7 @@ class ReplSuite extends SparkFunSuite {
}
test("SPARK-1199 two instances of same class don't type check.") {
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|case class Sum(exp: String, exp2: String)
|val a = Sum("A", "B")
@@ -254,7 +254,7 @@ class ReplSuite extends SparkFunSuite {
test("SPARK-2576 importing SQLContext.createDataFrame.") {
// We need to use local-cluster to test this case.
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
@@ -314,7 +314,7 @@ class ReplSuite extends SparkFunSuite {
}
test("collecting objects of class defined in repl - shuffling") {
- val output = runInterpreter("local-cluster[1,1,512]",
+ val output = runInterpreter("local-cluster[1,1,1024]",
"""
|case class Foo(i: Int)
|val list = List((1, Foo(1)), (1, Foo(2)))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index bee2ecbedb..72b35959a4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -53,7 +53,7 @@ class HiveSparkSubmitSuite
val args = Seq(
"--class", SparkSubmitClassLoaderTest.getClass.getName.stripSuffix("$"),
"--name", "SparkSubmitClassLoaderTest",
- "--master", "local-cluster[2,1,512]",
+ "--master", "local-cluster[2,1,1024]",
"--jars", jarsString,
unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
runSparkSubmit(args)
@@ -64,7 +64,7 @@ class HiveSparkSubmitSuite
val args = Seq(
"--class", SparkSQLConfTest.getClass.getName.stripSuffix("$"),
"--name", "SparkSQLConfTest",
- "--master", "local-cluster[2,1,512]",
+ "--master", "local-cluster[2,1,1024]",
unusedJar.toString)
runSparkSubmit(args)
}