aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala6
2 files changed, 4 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b524af9578..6046426fdf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -579,8 +579,7 @@ class StreamingContext private[streaming] (
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
- savedProperties.set(SerializationUtils.clone(
- sparkContext.localProperties.get()).asInstanceOf[Properties])
+ savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
scheduler.start()
}
state = StreamingContextState.ACTIVE
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index ac18f73ea8..79d6254eb3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,13 +17,12 @@
package org.apache.spark.streaming.scheduler
-import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Failure
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
@@ -219,8 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
- ssc.sparkContext.setLocalProperties(
- SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties])
+ ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"