aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala12
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java8
-rw-r--r--project/SparkBuild.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala10
4 files changed, 7 insertions, 26 deletions
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index dc3a28e27c..e626ed3621 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -51,10 +51,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
assert(valuesFor2.toList.sorted === List(1))
}
- // Some tests using `local-cluster` here are failed on Windows due to the failure of initiating
- // executors by the path length limitation. See SPARK-18718.
test("shuffle non-zero block size") {
- assume(!Utils.isWindows)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val NUM_BLOCKS = 3
@@ -80,7 +77,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("shuffle serializer") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
@@ -97,7 +93,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("zero sized blocks") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -125,7 +120,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("zero sized blocks without kryo") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -151,7 +145,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("shuffle on mutable pairs") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -164,7 +157,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("sorting on mutable pairs") {
- assume(!Utils.isWindows)
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -180,7 +172,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("cogroup using mutable pairs") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -208,7 +199,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("subtract mutable pairs") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -223,7 +213,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("sort with Java non serializable class - Kryo") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
@@ -238,7 +227,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
test("sort with Java non serializable class - Java") {
- assume(!Utils.isWindows)
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
val a = sc.parallelize(1 to 10, 2)
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index ba43659d96..0622fef17c 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -26,9 +26,11 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -135,7 +137,7 @@ abstract class AbstractCommandBuilder {
List<String> buildClassPath(String appClassPath) throws IOException {
String sparkHome = getSparkHome();
- List<String> cp = new ArrayList<>();
+ Set<String> cp = new LinkedHashSet<>();
addToClassPath(cp, getenv("SPARK_CLASSPATH"));
addToClassPath(cp, appClassPath);
@@ -201,7 +203,7 @@ abstract class AbstractCommandBuilder {
addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
addToClassPath(cp, getenv("YARN_CONF_DIR"));
addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
- return cp;
+ return new ArrayList<>(cp);
}
/**
@@ -210,7 +212,7 @@ abstract class AbstractCommandBuilder {
* @param cp List to which the new entries are appended.
* @param entries New classpath entries (separated by File.pathSeparator).
*/
- private void addToClassPath(List<String> cp, String entries) {
+ private void addToClassPath(Set<String> cp, String entries) {
if (isEmpty(entries)) {
return;
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index fdc33c77fe..74edd537f5 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -824,7 +824,8 @@ object TestSettings {
// launched by the tests have access to the correct test-time classpath.
envVars in Test ++= Map(
"SPARK_DIST_CLASSPATH" ->
- (fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
+ (fullClasspath in Test).value.files.map(_.getAbsolutePath)
+ .mkString(File.pathSeparator).stripSuffix(File.pathSeparator),
"SPARK_PREPEND_CLASSES" -> "1",
"SPARK_SCALA_VERSION" -> scalaBinaryVersion,
"SPARK_TESTING" -> "1",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 07839359a0..119d6e25df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -86,39 +86,31 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
plan
}
- // The tests here are failed on Windows due to the failure of initiating executors
- // by the path length limitation. See SPARK-18718.
test("unsafe broadcast hash join updates peak execution memory") {
- assume(!Utils.isWindows)
testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash join", "inner")
}
test("unsafe broadcast hash outer join updates peak execution memory") {
- assume(!Utils.isWindows)
testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash outer join", "left_outer")
}
test("unsafe broadcast left semi join updates peak execution memory") {
- assume(!Utils.isWindows)
testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast left semi join", "leftsemi")
}
test("broadcast hint isn't bothered by authBroadcastJoinThreshold set to low values") {
- assume(!Utils.isWindows)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
testBroadcastJoin[BroadcastHashJoinExec]("inner", true)
}
}
test("broadcast hint isn't bothered by a disabled authBroadcastJoinThreshold") {
- assume(!Utils.isWindows)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
testBroadcastJoin[BroadcastHashJoinExec]("inner", true)
}
}
test("broadcast hint isn't propagated after a join") {
- assume(!Utils.isWindows)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
@@ -146,7 +138,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
}
test("broadcast hint is propagated correctly") {
- assume(!Utils.isWindows)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "2"))).toDF("key", "value")
val broadcasted = broadcast(df2)
@@ -167,7 +158,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
}
test("join key rewritten") {
- assume(!Utils.isWindows)
val l = Literal(1L)
val i = Literal(2)
val s = Literal.create(3, ShortType)