From 69c9c177160e32a2fbc9b36ecc52156077fca6fc Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 25 Aug 2015 12:33:13 +0100 Subject: [SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen Closes #8033 from srowen/SPARK-9613. --- .../spark/streaming/kinesis/KinesisBackedBlockRDD.scala | 4 ++-- .../org/apache/spark/streaming/kinesis/KinesisReceiver.scala | 4 ++-- .../apache/spark/streaming/kinesis/KinesisTestUtils.scala | 3 ++- .../spark/streaming/kinesis/KinesisReceiverSuite.scala | 12 ++++++------ 4 files changed, 12 insertions(+), 11 deletions(-) (limited to 'extras') diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index a003ddf325..5d32fa699a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kinesis -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} @@ -213,7 +213,7 @@ class KinesisSequenceRangeIterator( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) } - (getRecordsResult.getRecords.iterator(), getRecordsResult.getNextShardIterator) + (getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator) } /** diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 22324e821c..6e0988c1af 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kinesis import java.util.UUID -import scala.collection.JavaConversions.asScalaIterator +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal @@ -202,7 +202,7 @@ private[kinesis] class KinesisReceiver( /** Add records of the given shard to the current block being generated */ private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = { if (records.size > 0) { - val dataIterator = records.iterator().map { record => + val dataIterator = records.iterator().asScala.map { record => val byteBuffer = record.getData() val byteArray = new Array[Byte](byteBuffer.remaining()) byteBuffer.get(byteArray) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index c8eec13ec7..634bf94521 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Random, Success, Try} @@ -115,7 +116,7 @@ private[kinesis] class KinesisTestUtils extends Logging { * Expose a Python friendly API. */ def pushData(testData: java.util.List[Int]): Unit = { - pushData(scala.collection.JavaConversions.asScalaBuffer(testData)) + pushData(testData.asScala) } def deleteStream(): Unit = { diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index ceb135e065..3d136aec2e 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -17,10 +17,10 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.Arrays -import scala.collection.JavaConversions.seqAsJavaList - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException} +import com.amazonaws.services.kinesis.clientlibrary.exceptions._ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record @@ -47,10 +47,10 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft val someSeqNum = Some(seqNum) val record1 = new Record() - record1.setData(ByteBuffer.wrap("Spark In Action".getBytes())) + record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8))) val record2 = new Record() - record2.setData(ByteBuffer.wrap("Learning Spark".getBytes())) - val batch = List[Record](record1, record2) + record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8))) + val batch = Arrays.asList(record1, record2) var receiverMock: KinesisReceiver = _ var checkpointerMock: IRecordProcessorCheckpointer = _ -- cgit v1.2.3