aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
committerSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
commit69c9c177160e32a2fbc9b36ecc52156077fca6fc (patch)
tree57345aaf19c3149038bfca5c4ddccf33d41bdd5b /streaming
parent7f1e507bf7e82bff323c5dec3c1ee044687c4173 (diff)
downloadspark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.gz
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.bz2
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.zip
[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen <sowen@cloudera.com> Closes #8033 from srowen/SPARK-9613.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala28
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala32
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala24
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala4
10 files changed, 55 insertions, 68 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 214cd80108..edfa474677 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,11 +17,10 @@
package org.apache.spark.streaming.api.java
-import java.util
import java.lang.{Long => JLong}
import java.util.{List => JList}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
@@ -145,8 +144,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* an array.
*/
def glom(): JavaDStream[JList[T]] =
- new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
-
+ new JavaDStream(dstream.glom().map(_.toSeq.asJava))
/** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
@@ -191,7 +189,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -204,7 +202,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
}
new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -282,7 +280,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
*/
def slice(fromTime: Time, toTime: Time): JList[R] = {
- new util.ArrayList(dstream.slice(fromTime, toTime).map(wrapRDD(_)).toSeq)
+ dstream.slice(fromTime, toTime).map(wrapRDD).asJava
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 26383e4201..e2aec6c2f6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.api.java
import java.lang.{Long => JLong, Iterable => JIterable}
import java.util.{List => JList}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
@@ -116,14 +116,14 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): JavaPairDStream[K, JIterable[V]] =
- dstream.groupByKey().mapValues(asJavaIterable _)
+ dstream.groupByKey().mapValues(_.asJava)
/**
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairDStream[K, JIterable[V]] =
- dstream.groupByKey(numPartitions).mapValues(asJavaIterable _)
+ dstream.groupByKey(numPartitions).mapValues(_.asJava)
/**
* Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
@@ -132,7 +132,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JIterable[V]] =
- dstream.groupByKey(partitioner).mapValues(asJavaIterable _)
+ dstream.groupByKey(partitioner).mapValues(_.asJava)
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
@@ -197,7 +197,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JIterable[V]] = {
- dstream.groupByKeyAndWindow(windowDuration).mapValues(asJavaIterable _)
+ dstream.groupByKeyAndWindow(windowDuration).mapValues(_.asJava)
}
/**
@@ -212,7 +212,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JIterable[V]] = {
- dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(asJavaIterable _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(_.asJava)
}
/**
@@ -228,8 +228,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[K, JIterable[V]] = {
- dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
- .mapValues(asJavaIterable _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions).mapValues(_.asJava)
}
/**
@@ -248,8 +247,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
slideDuration: Duration,
partitioner: Partitioner
): JavaPairDStream[K, JIterable[V]] = {
- dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
- .mapValues(asJavaIterable _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner).mapValues(_.asJava)
}
/**
@@ -431,7 +429,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]):
(Seq[V], Option[S]) => Option[S] = {
val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
- val list: JList[V] = values
+ val list: JList[V] = values.asJava
val scalaState: Optional[S] = JavaUtils.optionToOptional(state)
val result: Optional[S] = in.apply(list, scalaState)
result.isPresent match {
@@ -539,7 +537,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
- dstream.cogroup(other.dstream).mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
+ dstream.cogroup(other.dstream).mapValues(t => (t._1.asJava, t._2.asJava))
}
/**
@@ -551,8 +549,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
numPartitions: Int
): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
- dstream.cogroup(other.dstream, numPartitions)
- .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
+ dstream.cogroup(other.dstream, numPartitions).mapValues(t => (t._1.asJava, t._2.asJava))
}
/**
@@ -564,8 +561,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
partitioner: Partitioner
): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
- dstream.cogroup(other.dstream, partitioner)
- .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
+ dstream.cogroup(other.dstream, partitioner).mapValues(t => (t._1.asJava, t._2.asJava))
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 35cc3ce5cf..13f371f296 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -21,7 +21,7 @@ import java.lang.{Boolean => JBoolean}
import java.io.{Closeable, InputStream}
import java.util.{List => JList, Map => JMap}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
@@ -115,7 +115,13 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
sparkHome: String,
jars: Array[String],
environment: JMap[String, String]) =
- this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
+ this(new StreamingContext(
+ master,
+ appName,
+ batchDuration,
+ sparkHome,
+ jars,
+ environment.asScala))
/**
* Create a JavaStreamingContext using an existing JavaSparkContext.
@@ -197,7 +203,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaReceiverInputDStream[T] = {
- def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).toIterator
+ def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).iterator().asScala
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
@@ -432,7 +438,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
- sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ sQueue.enqueue(queue.asScala.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue)
}
@@ -456,7 +462,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
- sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ sQueue.enqueue(queue.asScala.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime)
}
@@ -481,7 +487,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
- sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ sQueue.enqueue(queue.asScala.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
}
@@ -500,7 +506,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
- val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
+ val dstreams: Seq[DStream[T]] = (Seq(first) ++ rest.asScala).map(_.dstream)
implicit val cm: ClassTag[T] = first.classTag
ssc.union(dstreams)(cm)
}
@@ -512,7 +518,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
first: JavaPairDStream[K, V],
rest: JList[JavaPairDStream[K, V]]
): JavaPairDStream[K, V] = {
- val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
+ val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.dstream)
implicit val cm: ClassTag[(K, V)] = first.classTag
implicit val kcm: ClassTag[K] = first.kManifest
implicit val vcm: ClassTag[V] = first.vManifest
@@ -534,12 +540,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
): JavaDStream[T] = {
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val scalaDStreams = dstreams.map(_.dstream).toSeq
val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
- val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+ val jrdds = rdds.map(JavaRDD.fromRDD(_)).asJava
transformFunc.call(jrdds, time).rdd
}
- ssc.transform(scalaDStreams, scalaTransformFunc)
+ ssc.transform(dstreams.asScala.map(_.dstream).toSeq, scalaTransformFunc)
}
/**
@@ -559,12 +564,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
- val scalaDStreams = dstreams.map(_.dstream).toSeq
val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
- val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+ val jrdds = rdds.map(JavaRDD.fromRDD(_)).asJava
transformFunc.call(jrdds, time).rdd
}
- ssc.transform(scalaDStreams, scalaTransformFunc)
+ ssc.transform(dstreams.asScala.map(_.dstream).toSeq, scalaTransformFunc)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index d06401245f..2c373640d2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -20,14 +20,13 @@ package org.apache.spark.streaming.api.python
import java.io.{ObjectInputStream, ObjectOutputStream}
import java.lang.reflect.Proxy
import java.util.{ArrayList => JArrayList, List => JList}
-import scala.collection.JavaConversions._
+
import scala.collection.JavaConverters._
import scala.language.existentials
import py4j.GatewayServer
import org.apache.spark.api.java._
-import org.apache.spark.api.python._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Interval, Duration, Time}
@@ -161,7 +160,7 @@ private[python] object PythonDStream {
*/
def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = {
val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
- rdds.forall(queue.add(_))
+ rdds.asScala.foreach(queue.add)
queue
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 554aae0117..2252e28f22 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.annotation.DeveloperApi
@@ -144,12 +144,12 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: java.util.Iterator[T], metadata: Any) {
- supervisor.pushIterator(dataIterator, Some(metadata), None)
+ supervisor.pushIterator(dataIterator.asScala, Some(metadata), None)
}
/** Store an iterator of received data as a data block into Spark's memory. */
def store(dataIterator: java.util.Iterator[T]) {
- supervisor.pushIterator(dataIterator, None, None)
+ supervisor.pushIterator(dataIterator.asScala, None, None)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 6d4cdc4aa6..0cd39594ee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.scheduler
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.util.{Failure, Success}
import org.apache.spark.Logging
@@ -128,7 +128,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
def getPendingTimes(): Seq[Time] = {
- jobSets.keySet.toSeq
+ jobSets.asScala.keys.toSeq
}
def reportError(msg: String, e: Throwable) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 53b96d51c9..f2711d1355 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler
import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.implicitConversions
@@ -196,8 +197,7 @@ private[streaming] class ReceivedBlockTracker(
writeAheadLogOption.foreach { writeAheadLog =>
logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
- import scala.collection.JavaConversions._
- writeAheadLog.readAll().foreach { byteBuffer =>
+ writeAheadLog.readAll().asScala.foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](
byteBuffer.array, Thread.currentThread().getContextClassLoader) match {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index fe6328b1ce..9f4a4d6806 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
@@ -118,7 +119,6 @@ private[streaming] class FileBasedWriteAheadLog(
* hence the implementation is kept simple.
*/
def readAll(): JIterator[ByteBuffer] = synchronized {
- import scala.collection.JavaConversions._
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
@@ -126,7 +126,7 @@ private[streaming] class FileBasedWriteAheadLog(
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
- } flatMap { x => x }
+ }.flatten.asJava
}
/**
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index bb80bff6dc..57b50bdfd6 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -17,16 +17,13 @@
package org.apache.spark.streaming
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.{List => JList}
+
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import java.util.{List => JList}
-import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming._
-import java.util.ArrayList
-import collection.JavaConversions._
import org.apache.spark.api.java.JavaRDDLike
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaDStreamLike, JavaDStream, JavaStreamingContext}
/** Exposes streaming test functionality in a Java-friendly way. */
trait JavaTestBase extends TestSuiteBase {
@@ -39,7 +36,7 @@ trait JavaTestBase extends TestSuiteBase {
ssc: JavaStreamingContext,
data: JList[JList[T]],
numPartitions: Int) = {
- val seqData = data.map(Seq(_:_*))
+ val seqData = data.asScala.map(_.asScala)
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
@@ -72,9 +69,7 @@ trait JavaTestBase extends TestSuiteBase {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
ssc.getState()
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
- val out = new ArrayList[JList[V]]()
- res.map(entry => out.append(new ArrayList[V](entry)))
- out
+ res.map(_.asJava).asJava
}
/**
@@ -90,12 +85,7 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
- val out = new ArrayList[JList[JList[V]]]()
- res.map{entry =>
- val lists = entry.map(new ArrayList[V](_))
- out.append(new ArrayList[JList[V]](lists))
- }
- out
+ res.map(entry => entry.map(_.asJava).asJava).asJava
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 325ff7c74c..5e49fd0076 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -20,6 +20,7 @@ import java.io._
import java.nio.ByteBuffer
import java.util
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
@@ -417,9 +418,8 @@ object WriteAheadLogSuite {
/** Read all the data in the log file in a directory using the WriteAheadLog class. */
def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = {
- import scala.collection.JavaConversions._
val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1)
- val data = wal.readAll().map(byteBufferToString).toSeq
+ val data = wal.readAll().asScala.map(byteBufferToString).toSeq
wal.close()
data
}