aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-05-03 00:18:10 -0700
committerXiangrui Meng <meng@databricks.com>2016-05-03 00:18:10 -0700
commitf10ae4b1e169495af11b8e8123c60dd96174477e (patch)
tree323357bac8e3b933780625c92292a53c7043e17a
parentd8f528ceb61e3c2ac7ac97cd8147dafbb625932f (diff)
downloadspark-f10ae4b1e169495af11b8e8123c60dd96174477e.tar.gz
spark-f10ae4b1e169495af11b8e8123c60dd96174477e.tar.bz2
spark-f10ae4b1e169495af11b8e8123c60dd96174477e.zip
[SPARK-6717][ML] Clear shuffle files after checkpointing in ALS
## What changes were proposed in this pull request? When ALS is run with a checkpoint interval, during the checkpoint materialize the current state and cleanup the previous shuffles (non-blocking). ## How was this patch tested? Existing ALS unit tests, new ALS checkpoint cleanup unit tests added & shuffle files checked after ALS w/checkpointing run. Author: Holden Karau <holden@us.ibm.com> Author: Holden Karau <holden@pigscanfly.ca> Closes #11919 from holdenk/SPARK-6717-clear-shuffle-files-after-checkpointing-in-ALS.
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala35
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala203
3 files changed, 195 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index a51338c017..c895fb3206 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -212,7 +212,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
- /** Perform shuffle cleanup, asynchronously. */
+ /** Perform shuffle cleanup. */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning shuffle " + shuffleId)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 541923048a..509c944fed 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
-import org.apache.spark.Partitioner
+import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
@@ -706,13 +706,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
previousItemFactors.unpersist()
itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel)
// TODO: Generalize PeriodicGraphCheckpointer and use it here.
+ val deps = itemFactors.dependencies
if (shouldCheckpoint(iter)) {
- itemFactors.checkpoint() // itemFactors gets materialized in computeFactors.
+ itemFactors.checkpoint() // itemFactors gets materialized in computeFactors
}
val previousUserFactors = userFactors
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, implicitPrefs, alpha, solver)
if (shouldCheckpoint(iter)) {
+ ALS.cleanShuffleDependencies(sc, deps)
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
}
@@ -723,8 +725,10 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam,
userLocalIndexEncoder, solver = solver)
if (shouldCheckpoint(iter)) {
+ val deps = itemFactors.dependencies
itemFactors.checkpoint()
itemFactors.count() // checkpoint item factors and cut lineage
+ ALS.cleanShuffleDependencies(sc, deps)
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
}
@@ -1355,4 +1359,31 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
* satisfies this requirement, we simply use a type alias here.
*/
private[recommendation] type ALSPartitioner = org.apache.spark.HashPartitioner
+
+ /**
+ * Private function to clean up all of the shuffles files from the dependencies and their parents.
+ */
+ private[spark] def cleanShuffleDependencies[T](
+ sc: SparkContext,
+ deps: Seq[Dependency[_]],
+ blocking: Boolean = false): Unit = {
+ // If there is no reference tracking we skip clean up.
+ sc.cleaner.foreach { cleaner =>
+ /**
+ * Clean the shuffles & all of its parents.
+ */
+ def cleanEagerly(dep: Dependency[_]): Unit = {
+ if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) {
+ val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
+ cleaner.doCleanupShuffle(shuffleId, blocking)
+ }
+ val rdd = dep.rdd
+ val rddDeps = rdd.dependencies
+ if (rdd.getStorageLevel == StorageLevel.NONE && rddDeps != null) {
+ rddDeps.foreach(cleanEagerly)
+ }
+ }
+ deps.foreach(cleanEagerly)
+ }
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 4c4eb72cd1..1704037395 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -17,15 +17,19 @@
package org.apache.spark.ml.recommendation
+import java.io.File
import java.util.Random
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import scala.language.existentials
import com.github.fommil.netlib.BLAS.{getInstance => blas}
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.TrueFileFilter
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.ml.recommendation.ALS._
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
@@ -34,8 +38,9 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
class ALSSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest with Logging {
@@ -255,37 +260,7 @@ class ALSSuite
rank: Int,
noiseStd: Double = 0.0,
seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = {
- // The assumption of the implicit feedback model is that unobserved ratings are more likely to
- // be negatives.
- val positiveFraction = 0.8
- val negativeFraction = 1.0 - positiveFraction
- val trainingFraction = 0.6
- val testFraction = 0.3
- val totalFraction = trainingFraction + testFraction
- val random = new Random(seed)
- val userFactors = genFactors(numUsers, rank, random)
- val itemFactors = genFactors(numItems, rank, random)
- val training = ArrayBuffer.empty[Rating[Int]]
- val test = ArrayBuffer.empty[Rating[Int]]
- for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) {
- val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1)
- val threshold = if (rating > 0) positiveFraction else negativeFraction
- val observed = random.nextDouble() < threshold
- if (observed) {
- val x = random.nextDouble()
- if (x < totalFraction) {
- if (x < trainingFraction) {
- val noise = noiseStd * random.nextGaussian()
- training += Rating(userId, itemId, rating + noise.toFloat)
- } else {
- test += Rating(userId, itemId, rating)
- }
- }
- }
- }
- logInfo(s"Generated an implicit feedback dataset with ${training.size} ratings for training " +
- s"and ${test.size} for test.")
- (sc.parallelize(training, 2), sc.parallelize(test, 2))
+ ALSSuite.genImplicitTestData(sc, numUsers, numItems, rank, noiseStd, seed)
}
/**
@@ -304,14 +279,7 @@ class ALSSuite
random: Random,
a: Float = -1.0f,
b: Float = 1.0f): Seq[(Int, Array[Float])] = {
- require(size > 0 && size < Int.MaxValue / 3)
- require(b > a)
- val ids = mutable.Set.empty[Int]
- while (ids.size < size) {
- ids += random.nextInt()
- }
- val width = b - a
- ids.toSeq.sorted.map(id => (id, Array.fill(rank)(a + random.nextFloat() * width)))
+ ALSSuite.genFactors(size, rank, random, a, b)
}
/**
@@ -520,6 +488,79 @@ class ALSSuite
}
}
+class ALSCleanerSuite extends SparkFunSuite {
+ test("ALS shuffle cleanup standalone") {
+ val conf = new SparkConf()
+ val localDir = Utils.createTempDir()
+ val checkpointDir = Utils.createTempDir()
+ def getAllFiles: Set[File] =
+ FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
+ try {
+ conf.set("spark.local.dir", localDir.getAbsolutePath)
+ val sc = new SparkContext("local[2]", "test", conf)
+ try {
+ sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+ // Test checkpoint and clean parents
+ val input = sc.parallelize(1 to 1000)
+ val keyed = input.map(x => (x % 20, 1))
+ val shuffled = keyed.reduceByKey(_ + _)
+ val keysOnly = shuffled.keys
+ val deps = keysOnly.dependencies
+ keysOnly.count()
+ ALS.cleanShuffleDependencies(sc, deps, true)
+ val resultingFiles = getAllFiles
+ assert(resultingFiles === Set())
+ // Ensure running count again works fine even if we kill the shuffle files.
+ keysOnly.count()
+ } finally {
+ sc.stop()
+ }
+ } finally {
+ Utils.deleteRecursively(localDir)
+ Utils.deleteRecursively(checkpointDir)
+ }
+ }
+
+ test("ALS shuffle cleanup in algorithm") {
+ val conf = new SparkConf()
+ val localDir = Utils.createTempDir()
+ val checkpointDir = Utils.createTempDir()
+ def getAllFiles: Set[File] =
+ FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
+ try {
+ conf.set("spark.local.dir", localDir.getAbsolutePath)
+ val sc = new SparkContext("local[2]", "test", conf)
+ try {
+ sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+ // Generate test data
+ val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
+ // Implicitly test the cleaning of parents during ALS training
+ val sqlContext = new SQLContext(sc)
+ import sqlContext.implicits._
+ val als = new ALS()
+ .setRank(1)
+ .setRegParam(1e-5)
+ .setSeed(0)
+ .setCheckpointInterval(1)
+ .setMaxIter(7)
+ val model = als.fit(training.toDF())
+ val resultingFiles = getAllFiles
+ // We expect the last shuffles files, block ratings, user factors, and item factors to be
+ // around but no more.
+ val pattern = "shuffle_(\\d+)_.+\\.data".r
+ val rddIds = resultingFiles.flatMap { f =>
+ pattern.findAllIn(f.getName()).matchData.map { _.group(1) } }
+ assert(rddIds.toSet.size === 4)
+ } finally {
+ sc.stop()
+ }
+ } finally {
+ Utils.deleteRecursively(localDir)
+ Utils.deleteRecursively(checkpointDir)
+ }
+ }
+}
+
class ALSStorageSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest with Logging {
@@ -591,7 +632,7 @@ private class IntermediateRDDStorageListener extends SparkListener {
}
-object ALSSuite {
+object ALSSuite extends Logging {
/**
* Mapping from all Params to valid settings which differ from the defaults.
@@ -620,4 +661,82 @@ object ALSSuite {
"intermediateStorageLevel" -> "MEMORY_ONLY",
"finalStorageLevel" -> "MEMORY_AND_DISK_SER"
)
+
+ // Helper functions to generate test data we share between ALS test suites
+
+ /**
+ * Generates random user/item factors, with i.i.d. values drawn from U(a, b).
+ * @param size number of users/items
+ * @param rank number of features
+ * @param random random number generator
+ * @param a min value of the support (default: -1)
+ * @param b max value of the support (default: 1)
+ * @return a sequence of (ID, factors) pairs
+ */
+ private def genFactors(
+ size: Int,
+ rank: Int,
+ random: Random,
+ a: Float = -1.0f,
+ b: Float = 1.0f): Seq[(Int, Array[Float])] = {
+ require(size > 0 && size < Int.MaxValue / 3)
+ require(b > a)
+ val ids = mutable.Set.empty[Int]
+ while (ids.size < size) {
+ ids += random.nextInt()
+ }
+ val width = b - a
+ ids.toSeq.sorted.map(id => (id, Array.fill(rank)(a + random.nextFloat() * width)))
+ }
+
+ /**
+ * Generates an implicit feedback dataset for testing ALS.
+ *
+ * @param sc SparkContext
+ * @param numUsers number of users
+ * @param numItems number of items
+ * @param rank rank
+ * @param noiseStd the standard deviation of additive Gaussian noise on training data
+ * @param seed random seed
+ * @return (training, test)
+ */
+ def genImplicitTestData(
+ sc: SparkContext,
+ numUsers: Int,
+ numItems: Int,
+ rank: Int,
+ noiseStd: Double = 0.0,
+ seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = {
+ // The assumption of the implicit feedback model is that unobserved ratings are more likely to
+ // be negatives.
+ val positiveFraction = 0.8
+ val negativeFraction = 1.0 - positiveFraction
+ val trainingFraction = 0.6
+ val testFraction = 0.3
+ val totalFraction = trainingFraction + testFraction
+ val random = new Random(seed)
+ val userFactors = genFactors(numUsers, rank, random)
+ val itemFactors = genFactors(numItems, rank, random)
+ val training = ArrayBuffer.empty[Rating[Int]]
+ val test = ArrayBuffer.empty[Rating[Int]]
+ for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) {
+ val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1)
+ val threshold = if (rating > 0) positiveFraction else negativeFraction
+ val observed = random.nextDouble() < threshold
+ if (observed) {
+ val x = random.nextDouble()
+ if (x < totalFraction) {
+ if (x < trainingFraction) {
+ val noise = noiseStd * random.nextGaussian()
+ training += Rating(userId, itemId, rating + noise.toFloat)
+ } else {
+ test += Rating(userId, itemId, rating)
+ }
+ }
+ }
+ }
+ logInfo(s"Generated an implicit feedback dataset with ${training.size} ratings for training " +
+ s"and ${test.size} for test.")
+ (sc.parallelize(training, 2), sc.parallelize(test, 2))
+ }
}