diff options
author | Sean Owen <sowen@cloudera.com> | 2015-08-25 12:33:13 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-08-25 12:33:13 +0100 |
commit | 69c9c177160e32a2fbc9b36ecc52156077fca6fc (patch) | |
tree | 57345aaf19c3149038bfca5c4ddccf33d41bdd5b /extras/kinesis-asl/src/main | |
parent | 7f1e507bf7e82bff323c5dec3c1ee044687c4173 (diff) | |
download | spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.gz spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.bz2 spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.zip |
[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters`
Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.
Author: Sean Owen <sowen@cloudera.com>
Closes #8033 from srowen/SPARK-9613.
Diffstat (limited to 'extras/kinesis-asl/src/main')
3 files changed, 6 insertions, 5 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index a003ddf325..5d32fa699a 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kinesis -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} @@ -213,7 +213,7 @@ class KinesisSequenceRangeIterator( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) } - (getRecordsResult.getRecords.iterator(), getRecordsResult.getNextShardIterator) + (getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator) } /** diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 22324e821c..6e0988c1af 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kinesis import java.util.UUID -import scala.collection.JavaConversions.asScalaIterator +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal @@ -202,7 +202,7 @@ private[kinesis] class KinesisReceiver( /** Add records of the given shard to the current block being generated */ private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = { if (records.size > 0) { - val dataIterator = records.iterator().map { record => + val dataIterator = records.iterator().asScala.map { record => val byteBuffer = record.getData() val byteArray = new Array[Byte](byteBuffer.remaining()) byteBuffer.get(byteArray) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index c8eec13ec7..634bf94521 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Random, Success, Try} @@ -115,7 +116,7 @@ private[kinesis] class KinesisTestUtils extends Logging { * Expose a Python friendly API. */ def pushData(testData: java.util.List[Int]): Unit = { - pushData(scala.collection.JavaConversions.asScalaBuffer(testData)) + pushData(testData.asScala) } def deleteStream(): Unit = { |