aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorHaoyuan Li <haoyuan@cs.berkeley.edu>2014-04-04 20:36:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-04 20:38:20 -0700
commitb50ddfde0342990979979e58348f54c10b500c90 (patch)
treecc7fa4d089375cded5056d9a93079e0b23a32ae7 /examples
parent1347ebd4b52ffb9197fc4137a55dff6badb149ba (diff)
downloadspark-b50ddfde0342990979979e58348f54c10b500c90.tar.gz
spark-b50ddfde0342990979979e58348f54c10b500c90.tar.bz2
spark-b50ddfde0342990979979e58348f54c10b500c90.zip
SPARK-1305: Support persisting RDD's directly to Tachyon
Move the PR#468 of apache-incubator-spark to the apache-spark "Adding an option to persist Spark RDD blocks into Tachyon." Author: Haoyuan Li <haoyuan@cs.berkeley.edu> Author: RongGu <gurongwalker@gmail.com> Closes #158 from RongGu/master and squashes the following commits: 72b7768 [Haoyuan Li] merge master 9f7fa1b [Haoyuan Li] fix code style ae7834b [Haoyuan Li] minor cleanup a8b3ec6 [Haoyuan Li] merge master branch e0f4891 [Haoyuan Li] better check offheap. 55b5918 [RongGu] address matei's comment on the replication of offHeap storagelevel 7cd4600 [RongGu] remove some logic code for tachyonstore's replication 51149e7 [RongGu] address aaron's comment on returning value of the remove() function in tachyonstore 8adfcfa [RongGu] address arron's comment on inTachyonSize 120e48a [RongGu] changed the root-level dir name in Tachyon 5cc041c [Haoyuan Li] address aaron's comments 9b97935 [Haoyuan Li] address aaron's comments d9a6438 [Haoyuan Li] fix for pspark 77d2703 [Haoyuan Li] change python api.git status 3dcace4 [Haoyuan Li] address matei's comments 91fa09d [Haoyuan Li] address patrick's comments 589eafe [Haoyuan Li] use TRY_CACHE instead of MUST_CACHE 64348b2 [Haoyuan Li] update conf docs. ed73e19 [Haoyuan Li] Merge branch 'master' of github.com:RongGu/spark-1 619a9a8 [RongGu] set number of directories in TachyonStore back to 64; added a TODO tag for duplicated code from the DiskStore be79d77 [RongGu] find a way to clean up some unnecessay metods and classed to make the code simpler 49cc724 [Haoyuan Li] update docs with off_headp option 4572f9f [RongGu] reserving the old apply function API of StorageLevel 04301d3 [RongGu] rename StorageLevel.TACHYON to Storage.OFF_HEAP c9aeabf [RongGu] rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP 76805aa [RongGu] unifies the config properties name prefix; add the configs into docs/configuration.md e700d9c [RongGu] add the SparkTachyonHdfsLR example and some comments fd84156 [RongGu] use randomUUID to generate sparkapp directory name on tachyon;minor code style fix 939e467 [Haoyuan Li] 0.4.1-thrift from maven central 86a2eab [Haoyuan Li] tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to download it. temporarily revert it back to 0.4.1 16c5798 [RongGu] make the dependency on tachyon as tachyon-0.4.1-thrift eacb2e8 [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 bbeb4de [RongGu] fix the JsonProtocolSuite test failure problem 6adb58f [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 d827250 [RongGu] fix JsonProtocolSuie test failure 716e93b [Haoyuan Li] revert the version ca14469 [Haoyuan Li] bump tachyon version to 0.4.1-thrift 2825a13 [RongGu] up-merging to the current master branch of the apache spark 6a22c1a [Haoyuan Li] fix scalastyle 8968b67 [Haoyuan Li] exclude more libraries from tachyon dependency to be the same as referencing tachyon-client. 77be7e8 [RongGu] address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice. 1dcadf9 [Haoyuan Li] typo bf278fa [Haoyuan Li] fix python tests e82909c [Haoyuan Li] minor cleanup 776a56c [Haoyuan Li] address patrick's and ali's comments from the previous PR 8859371 [Haoyuan Li] various minor fixes and clean up e3ddbba [Haoyuan Li] add doc to use Tachyon cache mode. fcaeab2 [Haoyuan Li] address Aaron's comment e554b1e [Haoyuan Li] add python code 47304b3 [Haoyuan Li] make tachyonStore in BlockMananger lazy val; add more comments StorageLevels. dc8ef24 [Haoyuan Li] add old storelevel constructor e01a271 [Haoyuan Li] update tachyon 0.4.1 8011a96 [RongGu] fix a brought-in mistake in StorageLevel 70ca182 [RongGu] a bit change in comment 556978b [RongGu] fix the scalastyle errors 791189b [RongGu] "Adding an option to persist Spark RDD blocks into Tachyon." move the PR#468 of apache-incubator-spark to the apache-spark
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPi.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala80
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala52
3 files changed, 133 insertions, 1 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
index e5a09ecec0..d3babc3ed1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
@@ -18,8 +18,8 @@
package org.apache.spark.examples
import scala.math.random
+
import org.apache.spark._
-import SparkContext._
/** Computes an approximation to pi */
object SparkPi {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
new file mode 100644
index 0000000000..53b303d658
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import java.util.Random
+import scala.math.exp
+import org.apache.spark.util.Vector
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.InputFormatInfo
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Logistic regression based classification.
+ * This example uses Tachyon to persist rdds during computation.
+ */
+object SparkTachyonHdfsLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def parsePoint(line: String): DataPoint = {
+ val tok = new java.util.StringTokenizer(line, " ")
+ var y = tok.nextToken.toDouble
+ var x = new Array[Double](D)
+ var i = 0
+ while (i < D) {
+ x(i) = tok.nextToken.toDouble; i += 1
+ }
+ DataPoint(new Vector(x), y)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: SparkTachyonHdfsLR <master> <file> <iters>")
+ System.exit(1)
+ }
+ val inputPath = args(1)
+ val conf = SparkHadoopUtil.get.newConfiguration()
+ val sc = new SparkContext(args(0), "SparkTachyonHdfsLR",
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
+ InputFormatInfo.computePreferredLocations(
+ Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
+ ))
+ val lines = sc.textFile(inputPath)
+ val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP)
+ val ITERATIONS = args(2).toInt
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = points.map { p =>
+ (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
+ }.reduce(_ + _)
+ w -= gradient
+ }
+
+ println("Final w: " + w)
+ System.exit(0)
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
new file mode 100644
index 0000000000..ce78f0876e
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import scala.math.random
+
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Computes an approximation to pi
+ * This example uses Tachyon to persist rdds during computation.
+ */
+object SparkTachyonPi {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkTachyonPi <master> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkTachyonPi",
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
+
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val n = 100000 * slices
+
+ val rdd = spark.parallelize(1 to n, slices)
+ rdd.persist(StorageLevel.OFF_HEAP)
+ val count = rdd.map { i =>
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x * x + y * y < 1) 1 else 0
+ }.reduce(_ + _)
+ println("Pi is roughly " + 4.0 * count / n)
+
+ spark.stop()
+ }
+}