aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-31 18:26:23 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-31 18:26:23 -0500
commit42bcfb2bb2b532dc12e13d3cfc1b4556bbb2c43c (patch)
treeeaf9b5aa4e3632cb5ea5ddcbb865624f33fa4de3 /streaming
parentba9338f104ccc71d4f342a3f96624a9b36895f48 (diff)
downloadspark-42bcfb2bb2b532dc12e13d3cfc1b4556bbb2c43c.tar.gz
spark-42bcfb2bb2b532dc12e13d3cfc1b4556bbb2c43c.tar.bz2
spark-42bcfb2bb2b532dc12e13d3cfc1b4556bbb2c43c.zip
Fix two compile errors introduced in merge
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala2
2 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index af443279a1..ca0115f90e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -64,7 +64,7 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Confi
val file = new Path(checkpointDir, "graph")
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
- val compressionCodec = CompressionCodec.createCodec()
+ val compressionCodec = CompressionCodec.createCodec(conf)
// The file to which we actually write - and then "move" to file
val writeFile = new Path(file.getParent, file.getName + ".next")
// The file to which existing checkpoint is backed up (i.e. "moved")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index e448211732..ab60a8166e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -50,7 +50,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val clock = {
val clockClass = ssc.sc.conf.getOrElse(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ Class.forName(clockClass).newInstance().asInstanceOf[Clock]
}
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))