aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-02-05 19:00:12 -0800
committerReynold Xin <rxin@databricks.com>2016-02-05 19:00:12 -0800
commit6883a5120c6b3a806024dfad03b8bf7371c6c0eb (patch)
tree06a98270f4f56b295c18931cdcb8cd3490683181
parent875f5079290a06c12d44de657dfeabc913e4a303 (diff)
downloadspark-6883a5120c6b3a806024dfad03b8bf7371c6c0eb.tar.gz
spark-6883a5120c6b3a806024dfad03b8bf7371c6c0eb.tar.bz2
spark-6883a5120c6b3a806024dfad03b8bf7371c6c0eb.zip
[SPARK-13171][CORE] Replace future calls with Future
Trivial search-and-replace to eliminate deprecation warnings in Scala 2.11. Also works with 2.10 Author: Jakob Odersky <jakob@odersky.com> Closes #11085 from jodersky/SPARK-13171.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala8
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala2
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala6
8 files changed, 23 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index c0ede4b7c8..15d220d01b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -22,7 +22,7 @@ import java.net.URL
import java.util.concurrent.TimeoutException
import scala.collection.mutable.ListBuffer
-import scala.concurrent.{future, promise, Await}
+import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -249,7 +249,7 @@ private object FaultToleranceTest extends App with Logging {
/** This includes Client retry logic, so it may take a while if the cluster is recovering. */
private def assertUsable() = {
- val f = future {
+ val f = Future {
try {
val res = sc.parallelize(0 until 10).collect()
assertTrue(res.toList == (0 until 10))
@@ -283,7 +283,7 @@ private object FaultToleranceTest extends App with Logging {
numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
}
- val f = future {
+ val f = Future {
try {
while (!stateValid()) {
Thread.sleep(1000)
@@ -405,7 +405,7 @@ private object SparkDocker {
}
private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
- val ipPromise = promise[String]()
+ val ipPromise = Promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 179d3b9f20..df3c286a0a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -394,7 +394,7 @@ private[deploy] class Worker(
// rpcEndpoint.
// Copy ids so that it can be used in the cleanup thread.
val appIds = executors.values.map(_.appId).toSet
- val cleanupFuture = concurrent.future {
+ val cleanupFuture = concurrent.Future {
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index e13a442463..c347ab8dc8 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.Semaphore
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
-import scala.concurrent.future
+import scala.concurrent.Future
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
@@ -103,7 +103,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val rdd1 = rdd.map(x => x)
- future {
+ Future {
taskStartedSemaphore.acquire()
sc.cancelAllJobs()
taskCancelledSemaphore.release(100000)
@@ -126,7 +126,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})
// jobA is the one to be cancelled.
- val jobA = future {
+ val jobA = Future {
sc.setJobGroup("jobA", "this is a job to be cancelled")
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}
@@ -191,7 +191,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})
// jobA is the one to be cancelled.
- val jobA = future {
+ val jobA = Future {
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
}
@@ -231,7 +231,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val f2 = rdd.countAsync()
// Kill one of the action.
- future {
+ Future {
sem1.acquire()
f1.cancel()
JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
@@ -247,7 +247,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
- future { f.cancel() }
+ Future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
@@ -263,7 +263,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
- future {
+ Future {
// Wait until some tasks were launched before we cancel the job.
sem.acquire()
f.cancel()
@@ -277,7 +277,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
- future { f.cancel() }
+ Future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
@@ -292,7 +292,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
- future {
+ Future {
sem.acquire()
f.cancel()
}
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 828153bdbf..c9c2fb2691 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -21,7 +21,7 @@ import java.io.InputStream
import java.util.concurrent.Semaphore
import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.future
+import scala.concurrent.Future
import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito._
@@ -149,7 +149,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
- future {
+ Future {
// Return the first two blocks, and wait till task completion before returning the 3rd one
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
@@ -211,7 +211,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
- future {
+ Future {
// Return the first block, and then fail.
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 0c42e2fc7c..b5413fbe2b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -36,7 +36,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
import scala.concurrent.duration._
val futures = (1 to 20).map { _ =>
- future {
+ Future {
GeneratePredicate.generate(EqualTo(Literal(1), Literal(1)))
GenerateMutableProjection.generate(EqualTo(Literal(1), Literal(1)) :: Nil)
GenerateOrdering.generate(Add(Literal(1), Literal(1)).asc :: Nil)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 8b275e886c..943ad31c0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -77,7 +77,7 @@ case class BroadcastHashJoin(
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- future {
+ Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index db8edd169d..f48fc3b848 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -76,7 +76,7 @@ case class BroadcastHashOuterJoin(
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- future {
+ Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index ba3b26e1b7..865197e24c 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, DriverManager, SQLException, Statement}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{future, Await, ExecutionContext, Promise}
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.io.Source
import scala.util.{Random, Try}
@@ -362,7 +362,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
try {
// Start a very-long-running query that will take hours to finish, then cancel it in order
// to demonstrate that cancellation works.
- val f = future {
+ val f = Future {
statement.executeQuery(
"SELECT COUNT(*) FROM test_map " +
List.fill(10)("join test_map").mkString(" "))
@@ -380,7 +380,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
// Cancellation is a no-op if spark.sql.hive.thriftServer.async=false
statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
try {
- val sf = future {
+ val sf = Future {
statement.executeQuery(
"SELECT COUNT(*) FROM test_map " +
List.fill(4)("join test_map").mkString(" ")