aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java4
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala75
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RUtils.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala7
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/Connection.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/ListenerBus.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/Utils.scala4
65 files changed, 304 insertions, 340 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
index 2389c28b28..fdb309e365 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import scala.Option;
import scala.Product2;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
@@ -160,7 +160,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
*/
@VisibleForTesting
public void write(Iterator<Product2<K, V>> records) throws IOException {
- write(JavaConversions.asScalaIterator(records));
+ write(JavaConverters.asScalaIteratorConverter(records).asScala());
}
@Override
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 92218832d2..a387592783 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,8 +21,8 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
-import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, RpcEndpoint}
@@ -398,7 +398,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses: Map[Int, Array[MapStatus]] =
- new ConcurrentHashMap[Int, Array[MapStatus]]
+ new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
}
private[spark] object MapOutputTracker extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 32df42d57d..3b9c885bf9 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -17,9 +17,11 @@
package org.apache.spark
-import java.io.{File, FileInputStream}
-import java.security.{KeyStore, NoSuchAlgorithmException}
-import javax.net.ssl.{KeyManager, KeyManagerFactory, SSLContext, TrustManager, TrustManagerFactory}
+import java.io.File
+import java.security.NoSuchAlgorithmException
+import javax.net.ssl.SSLContext
+
+import scala.collection.JavaConverters._
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory
@@ -79,7 +81,6 @@ private[spark] case class SSLOptions(
* object. It can be used then to compose the ultimate Akka configuration.
*/
def createAkkaConfig: Option[Config] = {
- import scala.collection.JavaConversions._
if (enabled) {
Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
@@ -97,7 +98,7 @@ private[spark] case class SSLOptions(
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
- ConfigValueFactory.fromIterable(supportedAlgorithms.toSeq))
+ ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9849aff85d..f3da04a7f5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -26,8 +26,8 @@ import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID
+import scala.collection.JavaConverters._
import scala.collection.{Map, Set}
-import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
@@ -1546,7 +1546,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def getAllPools: Seq[Schedulable] = {
assertNotStopped()
// TODO(xiajunluan): We should take nested pools into account
- taskScheduler.rootPool.schedulableQueue.toSeq
+ taskScheduler.rootPool.schedulableQueue.asScala.toSeq
}
/**
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index a1ebbecf93..888763a3e8 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -19,11 +19,12 @@ package org.apache.spark
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
+import java.nio.charset.StandardCharsets
+import java.util.Arrays
import java.util.jar.{JarEntry, JarOutputStream}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
-import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
@@ -71,7 +72,7 @@ private[spark] object TestUtils {
files.foreach { case (k, v) =>
val entry = new JarEntry(k)
jarStream.putNextEntry(entry)
- ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
+ ByteStreams.copy(new ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)), jarStream)
}
jarStream.close()
jarFile.toURI.toURL
@@ -125,7 +126,7 @@ private[spark] object TestUtils {
} else {
Seq()
}
- compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call()
+ compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call()
val fileName = className + ".class"
val result = new File(fileName)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
index 0ae0b4ec04..891bcddeac 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.api.java
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.mapred.InputSplit
@@ -37,7 +37,7 @@ class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
def mapPartitionsWithInputSplit[R](
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] = {
- new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
+ new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
index ec4f3964d7..0f49279f3e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.api.java
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.mapreduce.InputSplit
@@ -37,7 +37,7 @@ class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
def mapPartitionsWithInputSplit[R](
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] = {
- new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
+ new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 8441bb3a30..fb787979c1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.api.java
import java.util.{Comparator, List => JList, Map => JMap}
import java.lang.{Iterable => JIterable}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
@@ -142,7 +142,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
- new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))
+ new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed))
/**
* Return a subset of this RDD sampled by key (via stratified sampling).
@@ -173,7 +173,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sampleByKeyExact(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
- new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions, seed))
+ new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed))
/**
* ::Experimental::
@@ -768,7 +768,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
- def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key))
+ def lookup(key: K): JList[V] = rdd.lookup(key).asJava
/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
@@ -987,30 +987,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
object JavaPairRDD {
private[spark]
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
- rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
+ rddToPairRDDFunctions(rdd).mapValues(_.asJava)
}
private[spark]
def cogroupResultToJava[K: ClassTag, V, W](
rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
- rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
+ rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava))
}
private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
- rddToPairRDDFunctions(rdd)
- .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
+ rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava))
}
private[spark]
def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
- rddToPairRDDFunctions(rdd)
- .mapValues(x =>
- (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
+ rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava, x._4.asJava))
}
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c582488f16..fc817cdd6a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -21,7 +21,6 @@ import java.{lang => jl}
import java.lang.{Iterable => JIterable, Long => JLong}
import java.util.{Comparator, List => JList, Iterator => JIterator}
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -59,10 +58,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def rdd: RDD[T]
@deprecated("Use partitions() instead.", "1.1.0")
- def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
+ def splits: JList[Partition] = rdd.partitions.toSeq.asJava
/** Set of partitions in this RDD. */
- def partitions: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
+ def partitions: JList[Partition] = rdd.partitions.toSeq.asJava
/** The partitioner of this RDD. */
def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner)
@@ -82,7 +81,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* subclasses of RDD.
*/
def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
- asJavaIterator(rdd.iterator(split, taskContext))
+ rdd.iterator(split, taskContext).asJava
// Transformations (return a new RDD)
@@ -99,7 +98,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsWithIndex[R](
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
- new JavaRDD(rdd.mapPartitionsWithIndex(((a, b) => f(a, asJavaIterator(b))),
+ new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
/**
@@ -153,7 +152,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -164,7 +163,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
@@ -175,7 +174,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -186,7 +185,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -197,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
@@ -209,7 +208,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
@@ -219,14 +218,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
- rdd.foreachPartition((x => f.call(asJavaIterator(x))))
+ rdd.foreachPartition((x => f.call(x.asJava)))
}
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): JavaRDD[JList[T]] =
- new JavaRDD(rdd.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+ new JavaRDD(rdd.glom().map(_.toSeq.asJava))
/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
@@ -266,13 +265,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String]): JavaRDD[String] =
- rdd.pipe(asScalaBuffer(command))
+ rdd.pipe(command.asScala)
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
- rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
+ rdd.pipe(command.asScala, env.asScala)
/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
@@ -294,8 +293,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
- (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
- f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
+ (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala
}
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
@@ -333,22 +331,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return an array that contains all of the elements in this RDD.
*/
- def collect(): JList[T] = {
- import scala.collection.JavaConversions._
- val arr: java.util.Collection[T] = rdd.collect().toSeq
- new java.util.ArrayList(arr)
- }
+ def collect(): JList[T] =
+ rdd.collect().toSeq.asJava
/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*/
- def toLocalIterator(): JIterator[T] = {
- import scala.collection.JavaConversions._
- rdd.toLocalIterator
- }
-
+ def toLocalIterator(): JIterator[T] =
+ asJavaIteratorConverter(rdd.toLocalIterator).asJava
/**
* Return an array that contains all of the elements in this RDD.
@@ -363,9 +355,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
// This is useful for implementing `take` from other language frontends
// like Python where the data is serialized.
- import scala.collection.JavaConversions._
val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds)
- res.map(x => new java.util.ArrayList(x.toSeq)).toArray
+ res.map(_.toSeq.asJava)
}
/**
@@ -489,20 +480,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
- def take(num: Int): JList[T] = {
- import scala.collection.JavaConversions._
- val arr: java.util.Collection[T] = rdd.take(num).toSeq
- new java.util.ArrayList(arr)
- }
+ def take(num: Int): JList[T] =
+ rdd.take(num).toSeq.asJava
def takeSample(withReplacement: Boolean, num: Int): JList[T] =
takeSample(withReplacement, num, Utils.random.nextLong)
- def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = {
- import scala.collection.JavaConversions._
- val arr: java.util.Collection[T] = rdd.takeSample(withReplacement, num, seed).toSeq
- new java.util.ArrayList(arr)
- }
+ def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] =
+ rdd.takeSample(withReplacement, num, seed).toSeq.asJava
/**
* Return the first element in this RDD.
@@ -582,10 +567,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* @return an array of top elements
*/
def top(num: Int, comp: Comparator[T]): JList[T] = {
- import scala.collection.JavaConversions._
- val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
- val arr: java.util.Collection[T] = topElems.toSeq
- new java.util.ArrayList(arr)
+ rdd.top(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava
}
/**
@@ -607,10 +589,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* @return an array of top elements
*/
def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = {
- import scala.collection.JavaConversions._
- val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp))
- val arr: java.util.Collection[T] = topElems.toSeq
- new java.util.ArrayList(arr)
+ rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava
}
/**
@@ -696,7 +675,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
- new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
+ new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)),
{ x => null.asInstanceOf[Void] })
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 02e49a853c..609496ccdf 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,8 +21,7 @@ import java.io.Closeable
import java.util
import java.util.{Map => JMap}
-import scala.collection.JavaConversions
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
@@ -104,7 +103,7 @@ class JavaSparkContext(val sc: SparkContext)
*/
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String]) =
- this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map()))
+ this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment.asScala, Map()))
private[spark] val env = sc.env
@@ -118,7 +117,7 @@ class JavaSparkContext(val sc: SparkContext)
def appName: String = sc.appName
- def jars: util.List[String] = sc.jars
+ def jars: util.List[String] = sc.jars.asJava
def startTime: java.lang.Long = sc.startTime
@@ -142,7 +141,7 @@ class JavaSparkContext(val sc: SparkContext)
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
- sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
+ sc.parallelize(list.asScala, numSlices)
}
/** Get an RDD that has no partitions or elements. */
@@ -161,7 +160,7 @@ class JavaSparkContext(val sc: SparkContext)
: JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[V] = fakeClassTag
- JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
+ JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices))
}
/** Distribute a local Scala collection to form an RDD. */
@@ -170,8 +169,7 @@ class JavaSparkContext(val sc: SparkContext)
/** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
- JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
- numSlices))
+ JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), numSlices))
/** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
@@ -519,7 +517,7 @@ class JavaSparkContext(val sc: SparkContext)
/** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
- val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+ val rdds: Seq[RDD[T]] = (Seq(first) ++ rest.asScala).map(_.rdd)
implicit val ctag: ClassTag[T] = first.classTag
sc.union(rdds)
}
@@ -527,7 +525,7 @@ class JavaSparkContext(val sc: SparkContext)
/** Build the union of two or more RDDs. */
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
: JavaPairRDD[K, V] = {
- val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+ val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.rdd)
implicit val ctag: ClassTag[(K, V)] = first.classTag
implicit val ctagK: ClassTag[K] = first.kClassTag
implicit val ctagV: ClassTag[V] = first.vClassTag
@@ -536,7 +534,7 @@ class JavaSparkContext(val sc: SparkContext)
/** Build the union of two or more RDDs. */
override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
- val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
+ val rdds: Seq[RDD[Double]] = (Seq(first) ++ rest.asScala).map(_.srdd)
new JavaDoubleRDD(sc.union(rdds))
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index b959b683d1..a7dfa1d257 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,15 +17,17 @@
package org.apache.spark.api.python
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, SparkException}
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
-import scala.util.{Failure, Success, Try}
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* :: Experimental ::
@@ -68,7 +70,6 @@ private[python] class WritableToJavaConverter(
* object representation
*/
private def convertWritable(writable: Writable): Any = {
- import collection.JavaConversions._
writable match {
case iw: IntWritable => iw.get()
case dw: DoubleWritable => dw.get()
@@ -89,9 +90,7 @@ private[python] class WritableToJavaConverter(
aw.get().map(convertWritable(_))
case mw: MapWritable =>
val map = new java.util.HashMap[Any, Any]()
- mw.foreach { case (k, v) =>
- map.put(convertWritable(k), convertWritable(v))
- }
+ mw.asScala.foreach { case (k, v) => map.put(convertWritable(k), convertWritable(v)) }
map
case w: Writable => WritableUtils.clone(w, conf.value.value)
case other => other
@@ -122,7 +121,6 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
* supported out-of-the-box.
*/
private def convertToWritable(obj: Any): Writable = {
- import collection.JavaConversions._
obj match {
case i: java.lang.Integer => new IntWritable(i)
case d: java.lang.Double => new DoubleWritable(d)
@@ -134,7 +132,7 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
case null => NullWritable.get()
case map: java.util.Map[_, _] =>
val mapWritable = new MapWritable()
- map.foreach { case (k, v) =>
+ map.asScala.foreach { case (k, v) =>
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
@@ -161,9 +159,8 @@ private[python] object PythonHadoopUtil {
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]): Configuration = {
- import collection.JavaConversions._
val conf = new Configuration()
- map.foreach{ case (k, v) => conf.set(k, v) }
+ map.asScala.foreach { case (k, v) => conf.set(k, v) }
conf
}
@@ -172,9 +169,8 @@ private[python] object PythonHadoopUtil {
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
- import collection.JavaConversions._
val copy = new Configuration(left)
- right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
+ right.asScala.foreach(entry => copy.set(entry.getKey, entry.getValue))
copy
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2a56bf28d7..b4d152b336 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -21,7 +21,7 @@ import java.io._
import java.net._
import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
@@ -66,11 +66,11 @@ private[spark] class PythonRDD(
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
- envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
+ envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
if (reuse_worker) {
- envVars += ("SPARK_REUSE_WORKER" -> "1")
+ envVars.put("SPARK_REUSE_WORKER", "1")
}
- val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+ val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
// Whether is the worker released into idle pool
@volatile var released = false
@@ -150,7 +150,7 @@ private[spark] class PythonRDD(
// Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
if (reuse_worker) {
- env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+ env.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker)
released = true
}
}
@@ -217,13 +217,13 @@ private[spark] class PythonRDD(
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
// Python includes (*.zip and *.egg files)
- dataOut.writeInt(pythonIncludes.length)
- for (include <- pythonIncludes) {
+ dataOut.writeInt(pythonIncludes.size())
+ for (include <- pythonIncludes.asScala) {
PythonRDD.writeUTF(include, dataOut)
}
// Broadcast variables
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
- val newBids = broadcastVars.map(_.id).toSet
+ val newBids = broadcastVars.asScala.map(_.id).toSet
// number of different broadcasts
val toRemove = oldBids.diff(newBids)
val cnt = toRemove.size + newBids.diff(oldBids).size
@@ -233,7 +233,7 @@ private[spark] class PythonRDD(
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
- for (broadcast <- broadcastVars) {
+ for (broadcast <- broadcastVars.asScala) {
if (!oldBids.contains(broadcast.id)) {
// send new broadcast
dataOut.writeLong(broadcast.id)
@@ -287,7 +287,7 @@ private[spark] class PythonRDD(
if (!context.isCompleted) {
try {
logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
- env.destroyPythonWorker(pythonExec, envVars.toMap, worker)
+ env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
} catch {
case e: Exception =>
logError("Exception when trying to kill worker", e)
@@ -358,10 +358,10 @@ private[spark] object PythonRDD extends Logging {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
- sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions)
+ sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
serveIterator(flattenedPartition.iterator,
- s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
+ s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}")
}
/**
@@ -819,7 +819,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
val in = socket.getInputStream
val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
out.writeInt(val2.size)
- for (array <- val2) {
+ for (array <- val2.asScala) {
out.writeInt(array.length)
out.write(array)
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 90dacaeb93..31e534f160 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -17,10 +17,10 @@
package org.apache.spark.api.python
-import java.io.{File}
+import java.io.File
import java.util.{List => JList}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
@@ -51,7 +51,14 @@ private[spark] object PythonUtils {
* Convert list of T into seq of T (for calling API with varargs)
*/
def toSeq[T](vs: JList[T]): Seq[T] = {
- vs.toList.toSeq
+ vs.asScala
+ }
+
+ /**
+ * Convert list of T into a (Scala) List of T
+ */
+ def toList[T](vs: JList[T]): List[T] = {
+ vs.asScala.toList
}
/**
@@ -65,6 +72,6 @@ private[spark] object PythonUtils {
* Convert java map of K, V into Map of K, V (for calling API with varargs)
*/
def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = {
- jm.toMap
+ jm.asScala.toMap
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index e314408c06..7039b734d2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -19,9 +19,10 @@ package org.apache.spark.api.python
import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.util.Arrays
import scala.collection.mutable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark._
import org.apache.spark.util.{RedirectThread, Utils}
@@ -108,9 +109,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
+ val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
- workerEnv.putAll(envVars)
+ workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
@@ -151,9 +152,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
+ val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
- workerEnv.putAll(envVars)
+ workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 1f1debcf84..fd27276e70 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -22,7 +22,6 @@ import java.util.{ArrayList => JArrayList}
import org.apache.spark.api.java.JavaRDD
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Failure
@@ -214,7 +213,7 @@ private[spark] object SerDeUtil extends Logging {
new AutoBatchedPickler(cleaned)
} else {
val pickle = new Pickler
- cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+ cleaned.grouped(batchSize).map(batched => pickle.dumps(batched.asJava))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index 8f30ff9202..ee1fb056f0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -20,6 +20,8 @@ package org.apache.spark.api.python
import java.io.{DataOutput, DataInput}
import java.{util => ju}
+import scala.collection.JavaConverters._
+
import com.google.common.base.Charsets.UTF_8
import org.apache.hadoop.io._
@@ -62,10 +64,9 @@ private[python] class TestInputKeyConverter extends Converter[Any, Any] {
}
private[python] class TestInputValueConverter extends Converter[Any, Any] {
- import collection.JavaConversions._
override def convert(obj: Any): ju.List[Double] = {
val m = obj.asInstanceOf[MapWritable]
- seqAsJavaList(m.keySet.map(w => w.asInstanceOf[DoubleWritable].get()).toSeq)
+ m.keySet.asScala.map(_.asInstanceOf[DoubleWritable].get()).toSeq.asJava
}
}
@@ -76,9 +77,8 @@ private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
}
private[python] class TestOutputValueConverter extends Converter[Any, Any] {
- import collection.JavaConversions._
override def convert(obj: Any): DoubleWritable = {
- new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head)
+ new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().iterator().next())
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 1cf2824f86..9d5bbb5d60 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -19,9 +19,10 @@ package org.apache.spark.api.r
import java.io._
import java.net.{InetAddress, ServerSocket}
+import java.util.Arrays
import java.util.{Map => JMap}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.Try
@@ -365,11 +366,11 @@ private[r] object RRDD {
sparkConf.setIfMissing("spark.master", "local")
}
- for ((name, value) <- sparkEnvirMap) {
- sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String])
+ for ((name, value) <- sparkEnvirMap.asScala) {
+ sparkConf.set(name.toString, value.toString)
}
- for ((name, value) <- sparkExecutorEnvMap) {
- sparkConf.setExecutorEnv(name.asInstanceOf[String], value.asInstanceOf[String])
+ for ((name, value) <- sparkExecutorEnvMap.asScala) {
+ sparkConf.setExecutorEnv(name.toString, value.toString)
}
val jsc = new JavaSparkContext(sparkConf)
@@ -395,7 +396,7 @@ private[r] object RRDD {
val rOptions = "--vanilla"
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
val rExecScript = rLibDir + "/SparkR/worker/" + script
- val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
+ val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript))
// Unset the R_TESTS environment variable for workers.
// This is set by R CMD check as startup.Rs
// (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 427b2bc7cb..9e807cc52f 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -18,8 +18,7 @@
package org.apache.spark.api.r
import java.io.File
-
-import scala.collection.JavaConversions._
+import java.util.Arrays
import org.apache.spark.{SparkEnv, SparkException}
@@ -68,7 +67,7 @@ private[spark] object RUtils {
/** Check if R is installed before running tests that use R commands. */
def isRInstalled: Boolean = {
try {
- val builder = new ProcessBuilder(Seq("R", "--version"))
+ val builder = new ProcessBuilder(Arrays.asList("R", "--version"))
builder.start().waitFor() == 0
} catch {
case e: Exception => false
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index 3c89f24473..dbbbcf40c1 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -20,7 +20,7 @@ package org.apache.spark.api.r
import java.io.{DataInputStream, DataOutputStream}
import java.sql.{Timestamp, Date, Time}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* Utility functions to serialize, deserialize objects to / from R
@@ -165,7 +165,7 @@ private[spark] object SerDe {
val valueType = readObjectType(in)
readTypedObject(in, valueType)
})
- mapAsJavaMap(keys.zip(values).toMap)
+ keys.zip(values).toMap.asJava
} else {
new java.util.HashMap[Object, Object]()
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index a0c9b5e63c..7e3764d802 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -20,7 +20,7 @@ package org.apache.spark.broadcast
import java.io._
import java.nio.ByteBuffer
-import scala.collection.JavaConversions.asJavaEnumeration
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.Random
@@ -210,7 +210,7 @@ private object TorrentBroadcast extends Logging {
compressionCodec: Option[CompressionCodec]): T = {
require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
val is = new SequenceInputStream(
- asJavaEnumeration(blocks.iterator.map(block => new ByteBufferInputStream(block))))
+ blocks.iterator.map(new ByteBufferInputStream(_)).asJavaEnumeration)
val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
val ser = serializer.newInstance()
val serIn = ser.deserializeStream(in)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 22ef701d83..6840a3ae83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -19,13 +19,13 @@ package org.apache.spark.deploy
import java.util.concurrent.CountDownLatch
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslServerBootstrap
-import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.Utils
@@ -67,13 +67,13 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
def start() {
require(server == null, "Shuffle server already started")
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
- val bootstraps =
+ val bootstraps: Seq[TransportServerBootstrap] =
if (useSasl) {
Seq(new SaslServerBootstrap(transportConf, securityManager))
} else {
Nil
}
- server = transportContext.createServer(port, bootstraps)
+ server = transportContext.createServer(port, bootstraps.asJava)
}
/** Clean up all shuffle files associated with an application that has exited. */
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 23d01e9cbb..d85327603f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -21,7 +21,7 @@ import java.net.URI
import java.io.File
import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.spark.SparkUserAppException
@@ -71,7 +71,7 @@ object PythonRunner {
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
// Launch Python process
- val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
+ val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index ed1e972955..4b28866dca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -22,7 +22,7 @@ import java.util.jar.JarFile
import java.util.logging.Level
import java.util.zip.{ZipEntry, ZipOutputStream}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.io.{ByteStreams, Files}
@@ -110,7 +110,7 @@ private[deploy] object RPackageUtils extends Logging {
print(s"Building R package with the command: $installCmd", printStream)
}
try {
- val builder = new ProcessBuilder(installCmd)
+ val builder = new ProcessBuilder(installCmd.asJava)
builder.redirectErrorStream(true)
val env = builder.environment()
env.clear()
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index c0cab22fa8..05b954ce36 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
import java.io._
import java.util.concurrent.{Semaphore, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
@@ -68,7 +68,7 @@ object RRunner {
if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
// Launch R
val returnCode = try {
- val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
+ val builder = new ProcessBuilder((Seq(rCommand, rFileNormalized) ++ otherArgs).asJava)
val env = builder.environment()
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
index b8d3993540..8d5e716e6a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
@@ -57,7 +57,7 @@ private[spark] object SparkCuratorUtil extends Logging {
def deleteRecursive(zk: CuratorFramework, path: String) {
if (zk.checkExists().forPath(path) != null) {
- for (child <- zk.getChildren.forPath(path)) {
+ for (child <- zk.getChildren.forPath(path).asScala) {
zk.delete().forPath(path + "/" + child)
}
zk.delete().forPath(path)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index dda4216c7e..f7723ef5bd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -22,7 +22,7 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal
@@ -71,7 +71,7 @@ class SparkHadoopUtil extends Logging {
}
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
- for (token <- source.getTokens()) {
+ for (token <- source.getTokens.asScala) {
dest.addToken(token)
}
}
@@ -175,8 +175,8 @@ class SparkHadoopUtil extends Logging {
}
private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
- val stats = FileSystem.getAllStatistics()
- stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+ FileSystem.getAllStatistics.asScala.map(
+ Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}
private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
@@ -306,12 +306,13 @@ class SparkHadoopUtil extends Logging {
val renewalInterval =
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
- credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+ credentials.getAllTokens.asScala
+ .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.map { t =>
- val identifier = new DelegationTokenIdentifier()
- identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
- (identifier.getIssueDate + fraction * renewalInterval).toLong - now
- }.foldLeft(0L)(math.max)
+ val identifier = new DelegationTokenIdentifier()
+ identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
+ (identifier.getIssueDate + fraction * renewalInterval).toLong - now
+ }.foldLeft(0L)(math.max)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 3f3c6627c2..18a1c52ae5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -23,7 +23,7 @@ import java.net.URI
import java.util.{List => JList}
import java.util.jar.JarFile
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Source
@@ -94,7 +94,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Set parameters from command line arguments
try {
- parse(args.toList)
+ parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
@@ -458,7 +458,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
override protected def handleExtraArgs(extra: JList[String]): Unit = {
- childArgs ++= extra
+ childArgs ++= extra.asScala
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 563831cc6b..540e802420 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
import java.nio.ByteBuffer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.curator.framework.CuratorFramework
@@ -49,8 +49,8 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
}
override def read[T: ClassTag](prefix: String): Seq[T] = {
- val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix))
- file.map(deserializeFromFile[T]).flatten
+ zk.getChildren.forPath(WORKING_DIR).asScala
+ .filter(_.startsWith(prefix)).map(deserializeFromFile[T]).flatten
}
override def close() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 45a3f43045..ce02ee203a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -18,9 +18,8 @@
package org.apache.spark.deploy.worker
import java.io.{File, FileOutputStream, InputStream, IOException}
-import java.lang.System._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.Map
import org.apache.spark.Logging
@@ -62,7 +61,7 @@ object CommandUtils extends Logging {
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand()
- cmd.toSeq ++ Seq(command.mainClass) ++ command.arguments
+ cmd.asScala ++ Seq(command.mainClass) ++ command.arguments
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index ec51c3d935..89159ff5e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker
import java.io._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
@@ -172,8 +172,8 @@ private[deploy] class DriverRunner(
CommandUtils.redirectStream(process.getInputStream, stdout)
val stderr = new File(baseDir, "stderr")
- val header = "Launch Command: %s\n%s\n\n".format(
- builder.command.mkString("\"", "\" \"", "\""), "=" * 40)
+ val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
+ val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
Files.append(header, stderr, UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
@@ -229,6 +229,6 @@ private[deploy] trait ProcessBuilderLike {
private[deploy] object ProcessBuilderLike {
def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {
override def start(): Process = processBuilder.start()
- override def command: Seq[String] = processBuilder.command()
+ override def command: Seq[String] = processBuilder.command().asScala
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index ab3fea475c..3aef0515cb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker
import java.io._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
@@ -129,7 +129,8 @@ private[deploy] class ExecutorRunner(
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
- logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+ val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
+ logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
@@ -145,7 +146,7 @@ private[deploy] class ExecutorRunner(
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
- command.mkString("\"", "\" \"", "\""), "=" * 40)
+ formattedCommand, "=" * 40)
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 79b1536d94..770927c80f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -24,7 +24,6 @@ import java.util.{UUID, Date}
import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
-import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.Random
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 42a85e42ea..c3491bb8b1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -23,7 +23,7 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal
@@ -147,7 +147,7 @@ private[spark] class Executor(
/** Returns the total amount of time this JVM process has spent in garbage collection. */
private def computeTotalGcTime(): Long = {
- ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
+ ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum
}
class TaskRunner(
@@ -425,7 +425,7 @@ private[spark] class Executor(
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
val curGCTime = computeTotalGcTime()
- for (taskRunner <- runningTasks.values()) {
+ for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.foreach { metrics =>
metrics.updateShuffleReadMetrics()
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 293c512f8b..d16f4a1fc4 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -19,7 +19,7 @@ package org.apache.spark.executor
import java.util.concurrent.ThreadPoolExecutor
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.hadoop.fs.FileSystem
@@ -30,7 +30,7 @@ private[spark]
class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
- FileSystem.getAllStatistics().find(s => s.getScheme.equals(scheme))
+ FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme))
private def registerFileSystemStat[T](
scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index cfd672e1d8..0474fd2ccc 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver}
@@ -28,7 +28,7 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
import org.apache.spark.{Logging, TaskState, SparkConf, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler.cluster.mesos.{MesosTaskLaunchData}
+import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
import org.apache.spark.util.{SignalLogger, Utils}
private[spark] class MesosExecutorBackend
@@ -55,7 +55,7 @@ private[spark] class MesosExecutorBackend
slaveInfo: SlaveInfo) {
// Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
- val cpusPerTask = executorInfo.getResourcesList
+ val cpusPerTask = executorInfo.getResourcesList.asScala
.find(_.getName == "cpus")
.map(_.getScalar.getValue.toInt)
.getOrElse(0)
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 6cda7772f7..a5ad47293f 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -19,7 +19,7 @@ package org.apache.spark.input
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.io.ByteStreams
import org.apache.hadoop.conf.Configuration
@@ -44,12 +44,9 @@ private[spark] abstract class StreamFileInputFormat[T]
* which is set through setMaxSplitSize
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
- val files = listStatus(context)
- val totalLen = files.map { file =>
- if (file.isDir) 0L else file.getLen
- }.sum
-
- val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong
+ val files = listStatus(context).asScala
+ val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+ val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
super.setMaxSplitSize(maxSplitSize)
}
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index aaef7c74ee..1ba34a1141 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -17,7 +17,7 @@
package org.apache.spark.input
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
@@ -52,10 +52,8 @@ private[spark] class WholeTextFileInputFormat
* which is set through setMaxSplitSize
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
- val files = listStatus(context)
- val totalLen = files.map { file =>
- if (file.isDir) 0L else file.getLen
- }.sum
+ val files = listStatus(context).asScala
+ val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index 9be98723ae..0c096656f9 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.launcher
import java.io.File
import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.deploy.Command
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.Command
private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, command: Command)
extends AbstractCommandBuilder {
- childEnv.putAll(command.environment)
+ childEnv.putAll(command.environment.asJava)
childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sparkHome)
override def buildCommand(env: JMap[String, String]): JList[String] = {
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index d7495551ad..dd2d325d87 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -20,6 +20,7 @@ package org.apache.spark.metrics
import java.io.{FileInputStream, InputStream}
import java.util.Properties
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.matching.Regex
@@ -58,25 +59,20 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
propertyCategories = subProperties(properties, INSTANCE_REGEX)
if (propertyCategories.contains(DEFAULT_PREFIX)) {
- import scala.collection.JavaConversions._
-
- val defaultProperty = propertyCategories(DEFAULT_PREFIX)
- for { (inst, prop) <- propertyCategories
- if (inst != DEFAULT_PREFIX)
- (k, v) <- defaultProperty
- if (prop.getProperty(k) == null) } {
- prop.setProperty(k, v)
+ val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala
+ for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX);
+ (k, v) <- defaultProperty if (prop.get(k) == null)) {
+ prop.put(k, v)
}
}
}
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
val subProperties = new mutable.HashMap[String, Properties]
- import scala.collection.JavaConversions._
- prop.foreach { kv =>
- if (regex.findPrefixOf(kv._1).isDefined) {
- val regex(prefix, suffix) = kv._1
- subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+ prop.asScala.foreach { kv =>
+ if (regex.findPrefixOf(kv._1.toString).isDefined) {
+ val regex(prefix, suffix) = kv._1.toString
+ subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
}
}
subProperties
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index b089da8596..7c170a742f 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -19,7 +19,7 @@ package org.apache.spark.network.netty
import java.nio.ByteBuffer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.Logging
import org.apache.spark.network.BlockDataManager
@@ -55,7 +55,7 @@ class NettyBlockRpcServer(
case openBlocks: OpenBlocks =>
val blocks: Seq[ManagedBuffer] =
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
- val streamId = streamManager.registerStream(blocks.iterator)
+ val streamId = streamManager.registerStream(blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index d650d5fe73..ff8aae9ebe 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -17,7 +17,7 @@
package org.apache.spark.network.netty
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import org.apache.spark.{SecurityManager, SparkConf}
@@ -58,7 +58,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
securityManager.isSaslEncryptionEnabled()))
}
transportContext = new TransportContext(transportConf, rpcHandler)
- clientFactory = transportContext.createClientFactory(clientBootstrap.toList)
+ clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
logInfo("Server created on " + server.getPort)
@@ -67,7 +67,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
- val server = transportContext.createServer(port, bootstraps)
+ val server = transportContext.createServer(port, bootstraps.asJava)
(server, server.getPort)
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
index 1499da07bb..8d9ebadaf7 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
@@ -23,7 +23,7 @@ import java.nio.channels._
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.LinkedList
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal
@@ -145,7 +145,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
}
def callOnExceptionCallbacks(e: Throwable) {
- onExceptionCallbacks foreach {
+ onExceptionCallbacks.asScala.foreach {
callback =>
try {
callback(this, e)
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index 91b07ce3af..5afce75680 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.partial
import java.util.{HashMap => JHashMap}
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
@@ -48,9 +48,9 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf
if (outputsMerged == totalOutputs) {
val result = new JHashMap[T, BoundedDouble](sums.size)
sums.foreach { case (key, sum) =>
- result(key) = new BoundedDouble(sum, 1.0, sum, sum)
+ result.put(key, new BoundedDouble(sum, 1.0, sum, sum))
}
- result
+ result.asScala
} else if (outputsMerged == 0) {
new HashMap[T, BoundedDouble]
} else {
@@ -64,9 +64,9 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf
val stdev = math.sqrt(variance)
val low = mean - confFactor * stdev
val high = mean + confFactor * stdev
- result(key) = new BoundedDouble(mean, confidence, low, high)
+ result.put(key, new BoundedDouble(mean, confidence, low, high))
}
- result
+ result.asScala
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
index af26c3d59a..a164040684 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.partial
import java.util.{HashMap => JHashMap}
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.HashMap
@@ -55,9 +55,9 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Doub
while (iter.hasNext) {
val entry = iter.next()
val mean = entry.getValue.mean
- result(entry.getKey) = new BoundedDouble(mean, 1.0, mean, mean)
+ result.put(entry.getKey, new BoundedDouble(mean, 1.0, mean, mean))
}
- result
+ result.asScala
} else if (outputsMerged == 0) {
new HashMap[T, BoundedDouble]
} else {
@@ -72,9 +72,9 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Doub
val confFactor = studentTCacher.get(counter.count)
val low = mean - confFactor * stdev
val high = mean + confFactor * stdev
- result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+ result.put(entry.getKey, new BoundedDouble(mean, confidence, low, high))
}
- result
+ result.asScala
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
index 442fb86227..54a1beab35 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.partial
import java.util.{HashMap => JHashMap}
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.HashMap
@@ -55,9 +55,9 @@ private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Doubl
while (iter.hasNext) {
val entry = iter.next()
val sum = entry.getValue.sum
- result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+ result.put(entry.getKey, new BoundedDouble(sum, 1.0, sum, sum))
}
- result
+ result.asScala
} else if (outputsMerged == 0) {
new HashMap[T, BoundedDouble]
} else {
@@ -80,9 +80,9 @@ private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Doubl
val confFactor = studentTCacher.get(counter.count)
val low = sumEstimate - confFactor * sumStdev
val high = sumEstimate + confFactor * sumStdev
- result(entry.getKey) = new BoundedDouble(sumEstimate, confidence, low, high)
+ result.put(entry.getKey, new BoundedDouble(sumEstimate, confidence, low, high))
}
- result
+ result.asScala
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 326fafb230..4e5f2e8a5d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -22,7 +22,7 @@ import java.text.SimpleDateFormat
import java.util.{Date, HashMap => JHashMap}
import scala.collection.{Map, mutable}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.DynamicVariable
@@ -312,14 +312,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
} : Iterator[JHashMap[K, V]]
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
- m2.foreach { pair =>
+ m2.asScala.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
m1
} : JHashMap[K, V]
- self.mapPartitions(reducePartition).reduce(mergeMaps)
+ self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
}
/** Alias for reduceByKeyLocally */
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 3bb9998e1d..afbe566b76 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -23,7 +23,7 @@ import java.io.IOException
import java.io.PrintWriter
import java.util.StringTokenizer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
@@ -72,7 +72,7 @@ private[spark] class PipedRDD[T: ClassTag](
}
override def compute(split: Partition, context: TaskContext): Iterator[String] = {
- val pb = new ProcessBuilder(command)
+ val pb = new ProcessBuilder(command.asJava)
// Add the environmental variables to the process.
val currentEnvVars = pb.environment()
envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) }
@@ -81,7 +81,7 @@ private[spark] class PipedRDD[T: ClassTag](
// so the user code can access the input filename
if (split.isInstanceOf[HadoopPartition]) {
val hadoopSplit = split.asInstanceOf[HadoopPartition]
- currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
+ currentEnvVars.putAll(hadoopSplit.getPipeEnvVars().asJava)
}
// When spark.worker.separated.working.directory option is turned on, each
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index f7cb1791d4..9a4fa301b0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
import java.util.{HashMap => JHashMap}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
@@ -125,7 +125,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
integrate(0, t => getSeq(t._1) += t._2)
// the second dep is rdd2; remove all of its keys
integrate(1, t => map.remove(t._1))
- map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
+ map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
}
override def clearDependencies() {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index bac37bfdaa..0e438ab436 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.immutable.Set
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -107,7 +107,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
val retval = new ArrayBuffer[SplitInfo]()
val list = instance.getSplits(job)
- for (split <- list) {
+ for (split <- list.asScala) {
retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 174b73221a..5821afea98 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Logging
@@ -74,7 +74,7 @@ private[spark] class Pool(
if (schedulableNameToSchedulable.containsKey(schedulableName)) {
return schedulableNameToSchedulable.get(schedulableName)
}
- for (schedulable <- schedulableQueue) {
+ for (schedulable <- schedulableQueue.asScala) {
val sched = schedulable.getSchedulableByName(schedulableName)
if (sched != null) {
return sched
@@ -84,12 +84,12 @@ private[spark] class Pool(
}
override def executorLost(executorId: String, host: String) {
- schedulableQueue.foreach(_.executorLost(executorId, host))
+ schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
}
override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
- for (schedulable <- schedulableQueue) {
+ for (schedulable <- schedulableQueue.asScala) {
shouldRevive |= schedulable.checkSpeculatableTasks()
}
shouldRevive
@@ -98,7 +98,7 @@ private[spark] class Pool(
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
- schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
+ schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d6e1e9e5be..452c32d541 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, List => JList}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
import com.google.common.collect.HashBiMap
@@ -233,7 +233,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
stateLock.synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
- for (offer <- offers) {
+ for (offer <- offers.asScala) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.getValue
@@ -251,21 +251,21 @@ private[spark] class CoarseMesosSchedulerBackend(
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
- taskIdToSlaveId(taskId) = slaveId
+ taskIdToSlaveId.put(taskId, slaveId)
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
- partitionResources(remainingResources, "mem", calculateTotalMemory(sc))
+ partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
- .addAllResources(cpuResourcesToUse)
- .addAllResources(memResourcesToUse)
+ .addAllResources(cpuResourcesToUse.asJava)
+ .addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
@@ -314,9 +314,9 @@ private[spark] class CoarseMesosSchedulerBackend(
}
if (TaskState.isFinished(TaskState.fromMesos(state))) {
- val slaveId = taskIdToSlaveId(taskId)
+ val slaveId = taskIdToSlaveId.get(taskId)
slaveIdsWithExecutors -= slaveId
- taskIdToSlaveId -= taskId
+ taskIdToSlaveId.remove(taskId)
// Remove the cores we have remembered for this task, if it's in the hashmap
for (cores <- coresByTaskId.get(taskId)) {
totalCoresAcquired -= cores
@@ -361,7 +361,7 @@ private[spark] class CoarseMesosSchedulerBackend(
stateLock.synchronized {
if (slaveIdsWithExecutors.contains(slaveId)) {
val slaveIdToTaskId = taskIdToSlaveId.inverse()
- if (slaveIdToTaskId.contains(slaveId)) {
+ if (slaveIdToTaskId.containsKey(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
@@ -411,7 +411,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveIdToTaskId = taskIdToSlaveId.inverse()
for (executorId <- executorIds) {
val slaveId = executorId.split("/")(0)
- if (slaveIdToTaskId.contains(slaveId)) {
+ if (slaveIdToTaskId.containsKey(slaveId)) {
mesosDriver.killTask(
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
pendingRemovedSlaveIds += slaveId
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
index 3efc536f14..e0c547dce6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster.mesos
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
@@ -129,6 +129,6 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
}
override def fetchAll[T](): Iterable[T] = {
- zk.getChildren.forPath(WORKING_DIR).map(fetch[T]).flatten
+ zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1206f184fb..07da9242b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Date, List => JList}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -350,7 +350,7 @@ private[spark] class MesosClusterScheduler(
}
// TODO: Page the status updates to avoid trying to reconcile
// a large amount of tasks at once.
- driver.reconcileTasks(statuses)
+ driver.reconcileTasks(statuses.toSeq.asJava)
}
}
}
@@ -493,10 +493,10 @@ private[spark] class MesosClusterScheduler(
}
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
- val currentOffers = offers.map { o =>
+ val currentOffers = offers.asScala.map(o =>
new ResourceOffer(
o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
- }.toList
+ ).toList
logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
val currentTime = new Date()
@@ -521,10 +521,10 @@ private[spark] class MesosClusterScheduler(
currentOffers,
tasks)
}
- tasks.foreach { case (offerId, tasks) =>
- driver.launchTasks(Collections.singleton(offerId), tasks)
+ tasks.foreach { case (offerId, taskInfos) =>
+ driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
}
- offers
+ offers.asScala
.filter(o => !tasks.keySet.contains(o.getId))
.foreach(o => driver.declineOffer(o.getId))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 5c20606d58..2e424054be 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{ArrayList => JArrayList, Collections, List => JList}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.{Scheduler => MScheduler, _}
@@ -129,14 +129,12 @@ private[spark] class MesosSchedulerBackend(
val (resourcesAfterCpu, usedCpuResources) =
partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
val (resourcesAfterMem, usedMemResources) =
- partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))
+ partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
- builder.addAllResources(usedCpuResources)
- builder.addAllResources(usedMemResources)
+ builder.addAllResources(usedCpuResources.asJava)
+ builder.addAllResources(usedMemResources.asJava)
- sc.conf.getOption("spark.mesos.uris").map { uris =>
- setupUris(uris, command)
- }
+ sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
val executorInfo = builder
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -148,7 +146,7 @@ private[spark] class MesosSchedulerBackend(
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
}
- (executorInfo.build(), resourcesAfterMem)
+ (executorInfo.build(), resourcesAfterMem.asJava)
}
/**
@@ -193,7 +191,7 @@ private[spark] class MesosSchedulerBackend(
private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
val builder = new StringBuilder
- tasks.foreach { t =>
+ tasks.asScala.foreach { t =>
builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
.append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
.append("Task resources: ").append(t.getResourcesList).append("\n")
@@ -211,7 +209,7 @@ private[spark] class MesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
// Fail-fast on offers we know will be rejected
- val (usableOffers, unUsableOffers) = offers.partition { o =>
+ val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
@@ -323,10 +321,10 @@ private[spark] class MesosSchedulerBackend(
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(executorInfo)
.setName(task.name)
- .addAllResources(cpuResources)
+ .addAllResources(cpuResources.asJava)
.setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
.build()
- (taskInfo, finalResources)
+ (taskInfo, finalResources.asJava)
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 5b854aa5c2..860c8e097b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.util.{List => JList}
import java.util.concurrent.CountDownLatch
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
@@ -137,7 +137,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
protected def getResource(res: JList[Resource], name: String): Double = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
- res.filter(_.getName == name).map(_.getScalar.getValue).sum
+ res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}
protected def markRegistered(): Unit = {
@@ -169,7 +169,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
amountToUse: Double): (List[Resource], List[Resource]) = {
var remain = amountToUse
var requestedResources = new ArrayBuffer[Resource]
- val remainingResources = resources.map {
+ val remainingResources = resources.asScala.map {
case r => {
if (remain > 0 &&
r.getType == Value.Type.SCALAR &&
@@ -214,7 +214,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* @return
*/
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
- offerAttributes.map(attr => {
+ offerAttributes.asScala.map(attr => {
val attrValue = attr.getType match {
case Value.Type.SCALAR => attr.getScalar
case Value.Type.RANGES => attr.getRanges
@@ -253,7 +253,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
requiredValues.map(_.toLong).exists(offerRange.contains(_))
case Some(offeredValue: Value.Set) =>
// check if the specified required values is a subset of offered set
- requiredValues.subsetOf(offeredValue.getItemList.toSet)
+ requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet)
case Some(textValue: Value.Text) =>
// check if the specified value is equal, if multiple values are specified
// we succeed if any of them match.
@@ -299,14 +299,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
Map()
} else {
try {
- Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
- case (k, v) =>
- if (v == null || v.isEmpty) {
- (k, Set[String]())
- } else {
- (k, v.split(',').toSet)
- }
- }
+ splitter.split(constraintsVal).asScala.toMap.mapValues(v =>
+ if (v == null || v.isEmpty) {
+ Set[String]()
+ } else {
+ v.split(',').toSet
+ }
+ )
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 0ff7562e91..048a938507 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -21,6 +21,7 @@ import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import com.esotericsoftware.kryo.{Kryo, KryoException}
@@ -373,16 +374,15 @@ private class JavaIterableWrapperSerializer
override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]])
: java.lang.Iterable[_] = {
kryo.readClassAndObject(in) match {
- case scalaIterable: Iterable[_] =>
- scala.collection.JavaConversions.asJavaIterable(scalaIterable)
- case javaIterable: java.lang.Iterable[_] =>
- javaIterable
+ case scalaIterable: Iterable[_] => scalaIterable.asJava
+ case javaIterable: java.lang.Iterable[_] => javaIterable
}
}
}
private object JavaIterableWrapperSerializer extends Logging {
- // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper).
+ // The class returned by JavaConverters.asJava
+ // (scala.collection.convert.Wrappers$IterableWrapper).
val wrapperClass =
scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index f6a96d81e7..c057de9b3f 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.executor.ShuffleWriteMetrics
@@ -210,11 +210,13 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
shuffleStates.get(shuffleId) match {
case Some(state) =>
if (consolidateShuffleFiles) {
- for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
+ for (fileGroup <- state.allFileGroups.asScala;
+ file <- fileGroup.files) {
file.delete()
}
} else {
- for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
+ for (mapId <- state.completedMapTasks.asScala;
+ reduceId <- 0 until state.numBuckets) {
val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
blockManager.diskBlockManager.getFile(blockId).delete()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 6fec524070..7db6035553 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -21,7 +21,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.immutable.HashSet
import scala.collection.mutable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, ThreadSafeRpcEndpoint}
@@ -133,7 +133,7 @@ class BlockManagerMasterEndpoint(
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks.
- val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
+ val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
@@ -242,7 +242,7 @@ class BlockManagerMasterEndpoint(
private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
- new StorageStatus(blockManagerId, info.maxMem, info.blocks)
+ new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
}.toArray
}
@@ -292,7 +292,7 @@ class BlockManagerMasterEndpoint(
if (askSlaves) {
info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
} else {
- Future { info.blocks.keys.filter(filter).toSeq }
+ Future { info.blocks.asScala.keys.filter(filter).toSeq }
}
future
}
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 78e7ddc27d..1738258a0c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util
-import scala.collection.JavaConversions.mapAsJavaMap
+import scala.collection.JavaConverters._
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
@@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging {
val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
.getOrElse(ConfigFactory.empty())
- val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String])
+ val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
.withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
s"""
|akka.daemonic = on
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index a725767d08..13cb516b58 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -19,12 +19,11 @@ package org.apache.spark.util
import java.util.concurrent.CopyOnWriteArrayList
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.spark.Logging
-import org.apache.spark.scheduler.SparkListener
/**
* An event bus which posts events to its listeners.
@@ -46,7 +45,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
* `postToAll` in the same thread for all events.
*/
final def postToAll(event: E): Unit = {
- // JavaConversions will create a JIterableWrapper if we use some Scala collection functions.
+ // JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here ewe use
// Java Iterator directly.
val iter = listeners.iterator
@@ -69,7 +68,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
- listeners.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
+ listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
index 169489df6c..a1c33212cd 100644
--- a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
@@ -21,8 +21,6 @@ import java.net.{URLClassLoader, URL}
import java.util.Enumeration
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions._
-
/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
*/
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index 8de75ba9a9..d7e5143c30 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -21,7 +21,8 @@ import java.util.Set
import java.util.Map.Entry
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.{JavaConversions, mutable}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.spark.Logging
@@ -50,8 +51,7 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
}
def iterator: Iterator[(A, B)] = {
- val jIterator = getEntrySet.iterator
- JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue.value))
+ getEntrySet.iterator.asScala.map(kv => (kv.getKey, kv.getValue.value))
}
def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet
@@ -90,9 +90,7 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
}
override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = {
- JavaConversions.mapAsScalaConcurrentMap(internalMap)
- .map { case (k, TimeStampedValue(v, t)) => (k, v) }
- .filter(p)
+ internalMap.asScala.map { case (k, TimeStampedValue(v, t)) => (k, v) }.filter(p)
}
override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]()
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
index 7cd8f28b12..65efeb1f4c 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
import scala.collection.mutable.Set
private[spark] class TimeStampedHashSet[A] extends Set[A] {
@@ -31,7 +31,7 @@ private[spark] class TimeStampedHashSet[A] extends Set[A] {
def iterator: Iterator[A] = {
val jIterator = internalMap.entrySet().iterator()
- JavaConversions.asScalaIterator(jIterator).map(_.getKey)
+ jIterator.asScala.map(_.getKey)
}
override def + (elem: A): Set[A] = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8313312226..2bab4af2e7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -25,7 +25,7 @@ import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
@@ -748,12 +748,12 @@ private[spark] object Utils extends Logging {
// getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
// on unix-like system. On windows, it returns in index order.
// It's more proper to pick ip address following system output order.
- val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
+ val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq
val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse
for (ni <- reOrderedNetworkIFs) {
- val addresses = ni.getInetAddresses.toList
- .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress)
+ val addresses = ni.getInetAddresses.asScala
+ .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq
if (addresses.nonEmpty) {
val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
// because of Inet6Address.toHostName may add interface at the end if it knows about it
@@ -1498,10 +1498,8 @@ private[spark] object Utils extends Logging {
* properties which have been set explicitly, as well as those for which only a default value
* has been defined. */
def getSystemProperties: Map[String, String] = {
- val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield
- (key, System.getProperty(key))
-
- sysProps.toMap
+ System.getProperties.stringPropertyNames().asScala
+ .map(key => (key, System.getProperty(key))).toMap
}
/**
@@ -1812,7 +1810,8 @@ private[spark] object Utils extends Logging {
try {
val properties = new Properties()
properties.load(inReader)
- properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
+ properties.stringPropertyNames().asScala.map(
+ k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException =>
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
@@ -1941,7 +1940,8 @@ private[spark] object Utils extends Logging {
return true
}
isBindCollision(e.getCause)
- case e: MultiException => e.getThrowables.exists(isBindCollision)
+ case e: MultiException =>
+ e.getThrowables.asScala.exists(isBindCollision)
case e: Exception => isBindCollision(e.getCause)
case _ => false
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
index bdbca00a00..4939b600db 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util.collection
-import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}
+import scala.collection.JavaConverters._
import com.google.common.collect.{Ordering => GuavaOrdering}
@@ -34,6 +34,6 @@ private[spark] object Utils {
val ordering = new GuavaOrdering[T] {
override def compare(l: T, r: T): Int = ord.compare(l, r)
}
- collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
+ ordering.leastOf(input.asJava, num).iterator.asScala
}
}