aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-03-25 10:21:25 -0700
committerReynold Xin <rxin@apache.org>2014-03-25 10:21:25 -0700
commit71d4ed271bcbddb154643bd44297ed77190e75cf (patch)
treea6618c610ce4d9001ca8e5b08d4811e3105ecfc3
parent134ace7fea7f772f5bafa9d11b8677cb7d311266 (diff)
downloadspark-71d4ed271bcbddb154643bd44297ed77190e75cf.tar.gz
spark-71d4ed271bcbddb154643bd44297ed77190e75cf.tar.bz2
spark-71d4ed271bcbddb154643bd44297ed77190e75cf.zip
SPARK-1316. Remove use of Commons IO
(This follows from a side point on SPARK-1133, in discussion of the PR: https://github.com/apache/spark/pull/164 ) Commons IO is barely used in the project, and can easily be replaced with equivalent calls to Guava or the existing Spark `Utils.scala` class. Removing a dependency feels good, and this one in particular can get a little problematic since Hadoop uses it too. Author: Sean Owen <sowen@cloudera.com> Closes #226 from srowen/SPARK-1316 and squashes the following commits: 21efef3 [Sean Owen] Remove use of Commons IO
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala3
-rw-r--r--pom.xml5
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--streaming/pom.xml4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala28
10 files changed, 34 insertions, 54 deletions
diff --git a/core/pom.xml b/core/pom.xml
index a6f478b09b..eb6cc4d310 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -201,11 +201,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 13d9dbdd9a..ad87fda140 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -529,7 +529,10 @@ private[spark] object Utils extends Logging {
}
}
if (!file.delete()) {
- throw new IOException("Failed to delete: " + file)
+ // Delete can also fail if the file simply did not exist
+ if (file.exists()) {
+ throw new IOException("Failed to delete: " + file.getAbsolutePath)
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 8f55b2372c..eb8f591560 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -24,7 +24,6 @@ import java.nio.{ByteBuffer, ByteOrder}
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite
class UtilsSuite extends FunSuite {
@@ -136,7 +135,7 @@ class UtilsSuite extends FunSuite {
// Read some nonexistent bytes on both ends
assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
- FileUtils.deleteDirectory(tmpDir2)
+ Utils.deleteRecursively(tmpDir2)
}
test("deserialize long value") {
diff --git a/pom.xml b/pom.xml
index fa72d5f263..deb89b18ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -436,11 +436,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.4</version>
- </dependency>
- <dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.1</version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 21d2779d85..60f14ba37e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -259,8 +259,7 @@ object SparkBuild extends Build {
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
- "org.mockito" % "mockito-all" % "1.8.5" % "test",
- "commons-io" % "commons-io" % "2.4" % "test"
+ "org.mockito" % "mockito-all" % "1.8.5" % "test"
),
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
@@ -439,10 +438,7 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
- previousArtifact := sparkPreviousArtifact("spark-streaming"),
- libraryDependencies ++= Seq(
- "commons-io" % "commons-io" % "2.4"
- )
+ previousArtifact := sparkPreviousArtifact("spark-streaming")
)
def yarnCommonSettings = sharedSettings ++ Seq(
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 2cb8bde664..1953cc6883 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -74,10 +74,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 2bb616cfb8..c48a38590e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
import java.io.{File, ObjectInputStream, IOException}
+import java.nio.charset.Charset
import java.util.UUID
import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
-import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
@@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i + 1).toString)
val hadoopFile = new Path(testDir, (i + 1).toString)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
- FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
+ Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8"))
var tries = 0
var done = false
while (!done && tries < maxTries) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 831e7c1471..0784e562ac 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,10 +18,10 @@
package org.apache.spark.streaming
import java.io.File
+import java.nio.charset.Charset
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import org.apache.commons.io.FileUtils
import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase {
override def beforeFunction() {
super.beforeFunction()
- FileUtils.deleteDirectory(new File(checkpointDir))
+ Utils.deleteRecursively(new File(checkpointDir))
}
override def afterFunction() {
super.afterFunction()
if (ssc != null) ssc.stop()
- FileUtils.deleteDirectory(new File(checkpointDir))
+ Utils.deleteRecursively(new File(checkpointDir))
}
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
@@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase {
//var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
// wait to make sure that the file is written such that it gets shown in the file listings
Thread.sleep(1000)
}
@@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase {
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}
@@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase {
// Restart stream computation
ssc.start()
for (i <- Seq(7, 8, 9)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}
Thread.sleep(1000)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index da9b04de1a..92e1b76d28 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -19,14 +19,9 @@ package org.apache.spark.streaming
import org.apache.spark.Logging
import org.apache.spark.streaming.util.MasterFailureTest
-import StreamingContext._
+import org.apache.spark.util.Utils
-import org.scalatest.{FunSuite, BeforeAndAfter}
-import com.google.common.io.Files
import java.io.File
-import org.apache.commons.io.FileUtils
-import collection.mutable.ArrayBuffer
-
/**
* This testsuite tests master failures at random times while the stream is running using
@@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging {
override def beforeFunction() {
super.beforeFunction()
- FileUtils.deleteDirectory(new File(directory))
+ Utils.deleteRecursively(new File(directory))
}
override def afterFunction() {
super.afterFunction()
- FileUtils.deleteDirectory(new File(directory))
+ Utils.deleteRecursively(new File(directory))
}
test("multiple failures with map") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 95bf40ba75..74e73ebb34 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,21 +23,23 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
-import org.apache.spark.streaming.dstream.{NetworkReceiver}
-import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
+import java.net.{InetSocketAddress, SocketException, ServerSocket}
+import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import util.ManualClock
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import com.google.common.io.Files
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.NetworkReceiver
import org.apache.spark.streaming.receivers.Receiver
-import org.apache.spark.Logging
-import scala.util.Random
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfter
-import collection.JavaConversions._
-import com.google.common.io.Files
-import java.util.concurrent.atomic.AtomicInteger
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.Utils
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
Thread.sleep(1000)
for (i <- 0 until input.size) {
val file = new File(testDir, i.toString)
- FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
logInfo("Created file " + file)
Thread.sleep(batchDuration.milliseconds)
Thread.sleep(1000)
@@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// (whether the elements were received one in each interval is not verified)
assert(output.toList === expectedOutput.toList)
- FileUtils.deleteDirectory(testDir)
+ Utils.deleteRecursively(testDir)
// Enable manual clock back again for other tests
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")