aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
committerSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
commit69c9c177160e32a2fbc9b36ecc52156077fca6fc (patch)
tree57345aaf19c3149038bfca5c4ddccf33d41bdd5b /external/flume
parent7f1e507bf7e82bff323c5dec3c1ee044687c4173 (diff)
downloadspark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.gz
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.bz2
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.zip
[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen <sowen@cloudera.com> Closes #8033 from srowen/SPARK-9613.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala4
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala3
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala7
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala6
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala10
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala8
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala16
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala8
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
9 files changed, 29 insertions, 35 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
index 65c49c1315..48df27b268 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.flume
import java.io.{ObjectOutput, ObjectInput}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.util.Utils
import org.apache.spark.Logging
@@ -60,7 +60,7 @@ private[streaming] object EventTransformer extends Logging {
out.write(body)
val numHeaders = headers.size()
out.writeInt(numHeaders)
- for ((k, v) <- headers) {
+ for ((k, v) <- headers.asScala) {
val keyBuff = Utils.serialize(k.toString)
out.writeInt(keyBuff.length)
out.write(keyBuff)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
index 88cc2aa3bf..b9d4e762ca 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.streaming.flume
-import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
@@ -155,7 +154,7 @@ private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends R
val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
var j = 0
while (j < events.size()) {
- val event = events(j)
+ val event = events.get(j)
val sparkFlumeEvent = new SparkFlumeEvent()
sparkFlumeEvent.event.setBody(event.getBody)
sparkFlumeEvent.event.setHeaders(event.getHeaders)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 1e32a365a1..2bf99cb3cb 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -22,7 +22,7 @@ import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
import java.util.concurrent.Executors
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.flume.source.avro.AvroSourceProtocol
@@ -99,7 +99,7 @@ class SparkFlumeEvent() extends Externalizable {
val numHeaders = event.getHeaders.size()
out.writeInt(numHeaders)
- for ((k, v) <- event.getHeaders) {
+ for ((k, v) <- event.getHeaders.asScala) {
val keyBuff = Utils.serialize(k.toString)
out.writeInt(keyBuff.length)
out.write(keyBuff)
@@ -127,8 +127,7 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
- events.foreach (event =>
- receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
+ events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 583e7dca31..0bc46209b8 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.util.concurrent.{LinkedBlockingQueue, Executors}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -94,9 +94,7 @@ private[streaming] class FlumePollingReceiver(
override def onStop(): Unit = {
logInfo("Shutting down Flume Polling Receiver")
receiverExecutor.shutdownNow()
- connections.foreach(connection => {
- connection.transceiver.close()
- })
+ connections.asScala.foreach(_.transceiver.close())
channelFactory.releaseExternalResources()
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
index 9d9c3b1894..70018c86f9 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.flume
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
-import java.util.{List => JList}
+import java.util.Collections
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.base.Charsets.UTF_8
import org.apache.avro.ipc.NettyTransceiver
@@ -59,13 +59,13 @@ private[flume] class FlumeTestUtils {
}
/** Send data to the flume receiver */
- def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+ def writeInput(input: Seq[String], enableCompression: Boolean): Unit = {
val testAddress = new InetSocketAddress("localhost", testPort)
val inputEvents = input.map { item =>
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
- event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+ event.setHeaders(Collections.singletonMap("test", "header"))
event
}
@@ -88,7 +88,7 @@ private[flume] class FlumeTestUtils {
}
// Send data
- val status = client.appendBatch(inputEvents.toList)
+ val status = client.appendBatch(inputEvents.asJava)
if (status != avro.Status.OK) {
throw new AssertionError("Sent events unsuccessfully")
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index a65a9b921a..c719b80aca 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.util.{List => JList, Map => JMap}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import org.apache.spark.api.java.function.PairFunction
import org.apache.spark.api.python.PythonRDD
@@ -268,8 +268,8 @@ private[flume] class FlumeUtilsPythonHelper {
maxBatchSize: Int,
parallelism: Int
): JavaPairDStream[Array[Byte], Array[Byte]] = {
- assert(hosts.length == ports.length)
- val addresses = hosts.zip(ports).map {
+ assert(hosts.size() == ports.size())
+ val addresses = hosts.asScala.zip(ports.asScala).map {
case (host, port) => new InetSocketAddress(host, port)
}
val dstream = FlumeUtils.createPollingStream(
@@ -286,7 +286,7 @@ private object FlumeUtilsPythonHelper {
val output = new DataOutputStream(byteStream)
try {
output.writeInt(map.size)
- map.foreach { kv =>
+ map.asScala.foreach { kv =>
PythonRDD.writeUTF(kv._1.toString, output)
PythonRDD.writeUTF(kv._2.toString, output)
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 91d63d49db..a2ab320957 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -18,9 +18,8 @@
package org.apache.spark.streaming.flume
import java.util.concurrent._
-import java.util.{List => JList, Map => JMap}
+import java.util.{Map => JMap, Collections}
-import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Charsets.UTF_8
@@ -77,7 +76,7 @@ private[flume] class PollingFlumeTestUtils {
/**
* Start 2 sinks and return the ports
*/
- def startMultipleSinks(): JList[Int] = {
+ def startMultipleSinks(): Seq[Int] = {
channels.clear()
sinks.clear()
@@ -138,8 +137,7 @@ private[flume] class PollingFlumeTestUtils {
/**
* A Python-friendly method to assert the output
*/
- def assertOutput(
- outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
+ def assertOutput(outputHeaders: Seq[JMap[String, String]], outputBodies: Seq[String]): Unit = {
require(outputHeaders.size == outputBodies.size)
val eventSize = outputHeaders.size
if (eventSize != totalEventsPerChannel * channels.size) {
@@ -149,12 +147,12 @@ private[flume] class PollingFlumeTestUtils {
var counter = 0
for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
val eventBodyToVerify = s"${channels(k).getName}-$i"
- val eventHeaderToVerify: JMap[String, String] = Map[String, String](s"test-$i" -> "header")
+ val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header")
var found = false
var j = 0
while (j < eventSize && !found) {
- if (eventBodyToVerify == outputBodies.get(j) &&
- eventHeaderToVerify == outputHeaders.get(j)) {
+ if (eventBodyToVerify == outputBodies(j) &&
+ eventHeaderToVerify == outputHeaders(j)) {
found = true
counter += 1
}
@@ -195,7 +193,7 @@ private[flume] class PollingFlumeTestUtils {
tx.begin()
for (j <- 0 until eventsPerBatch) {
channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8),
- Map[String, String](s"test-$t" -> "header")))
+ Collections.singletonMap(s"test-$t", "header")))
t += 1
}
tx.commit()
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index d5f9a0aa38..ff2fb8eed2 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -116,9 +116,9 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
// The eventually is required to ensure that all data in the batch has been processed.
eventually(timeout(10 seconds), interval(100 milliseconds)) {
val flattenOutputBuffer = outputBuffer.flatten
- val headers = flattenOutputBuffer.map(_.event.getHeaders.map {
- case kv => (kv._1.toString, kv._2.toString)
- }).map(mapAsJavaMap)
+ val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map {
+ case (key, value) => (key.toString, value.toString)
+ }).map(_.asJava)
val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8))
utils.assertOutput(headers, bodies)
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 5bc4cdf653..5ffb60bd60 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.flume
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps