aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src
diff options
context:
space:
mode:
Diffstat (limited to 'external/kinesis-asl/src')
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala3
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala3
2 files changed, 4 insertions, 2 deletions
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0ace453ee9..026387ed65 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@@ -242,7 +243,7 @@ private[kinesis] class SimpleDataGenerator(
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
+ val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))
val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
.withData(data)
.withPartitionKey(str)
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index fdb270eaad..0b455e574e 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -51,7 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
+ val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))
val future = producer.addUserRecord(streamName, str, data)
val kinesisCallBack = new FutureCallback[UserRecordResult]() {
override def onFailure(t: Throwable): Unit = {} // do nothing