aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
committerSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
commit69c9c177160e32a2fbc9b36ecc52156077fca6fc (patch)
tree57345aaf19c3149038bfca5c4ddccf33d41bdd5b /extras
parent7f1e507bf7e82bff323c5dec3c1ee044687c4173 (diff)
downloadspark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.gz
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.bz2
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.zip
[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen <sowen@cloudera.com> Closes #8033 from srowen/SPARK-9613.
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala4
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala4
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala3
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala12
4 files changed, 12 insertions, 11 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index a003ddf325..5d32fa699a 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kinesis
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
@@ -213,7 +213,7 @@ class KinesisSequenceRangeIterator(
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)
}
- (getRecordsResult.getRecords.iterator(), getRecordsResult.getNextShardIterator)
+ (getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator)
}
/**
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 22324e821c..6e0988c1af 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -18,7 +18,7 @@ package org.apache.spark.streaming.kinesis
import java.util.UUID
-import scala.collection.JavaConversions.asScalaIterator
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
@@ -202,7 +202,7 @@ private[kinesis] class KinesisReceiver(
/** Add records of the given shard to the current block being generated */
private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
if (records.size > 0) {
- val dataIterator = records.iterator().map { record =>
+ val dataIterator = records.iterator().asScala.map { record =>
val byteBuffer = record.getData()
val byteArray = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(byteArray)
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index c8eec13ec7..634bf94521 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Random, Success, Try}
@@ -115,7 +116,7 @@ private[kinesis] class KinesisTestUtils extends Logging {
* Expose a Python friendly API.
*/
def pushData(testData: java.util.List[Int]): Unit = {
- pushData(scala.collection.JavaConversions.asScalaBuffer(testData))
+ pushData(testData.asScala)
}
def deleteStream(): Unit = {
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index ceb135e065..3d136aec2e 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -17,10 +17,10 @@
package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.Arrays
-import scala.collection.JavaConversions.seqAsJavaList
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import com.amazonaws.services.kinesis.clientlibrary.exceptions._
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
@@ -47,10 +47,10 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
val someSeqNum = Some(seqNum)
val record1 = new Record()
- record1.setData(ByteBuffer.wrap("Spark In Action".getBytes()))
+ record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8)))
val record2 = new Record()
- record2.setData(ByteBuffer.wrap("Learning Spark".getBytes()))
- val batch = List[Record](record1, record2)
+ record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8)))
+ val batch = Arrays.asList(record1, record2)
var receiverMock: KinesisReceiver = _
var checkpointerMock: IRecordProcessorCheckpointer = _