aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-04-09 01:14:46 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-09 01:14:46 -0700
commit87bd1f9ef7d547ee54a8a83214b45462e0751efb (patch)
treef815ba0464d366300b9bf3833ad3f5fdea2ca9eb /tools
parent9689b663a2a4947ad60795321c770052f3c637f1 (diff)
downloadspark-87bd1f9ef7d547ee54a8a83214b45462e0751efb.tar.gz
spark-87bd1f9ef7d547ee54a8a83214b45462e0751efb.tar.bz2
spark-87bd1f9ef7d547ee54a8a83214b45462e0751efb.zip
SPARK-1093: Annotate developer and experimental API's
This patch marks some existing classes as private[spark] and adds two types of API annotations: - `EXPERIMENTAL API` = experimental user-facing module - `DEVELOPER API - UNSTABLE` = developer-facing API that might change There is some discussion of the different mechanisms for doing this here: https://issues.apache.org/jira/browse/SPARK-1081 I was pretty aggressive with marking things private. Keep in mind that if we want to open something up in the future we can, but we can never reduce visibility. A few notes here: - In the past we've been inconsistent with the visiblity of the X-RDD classes. This patch marks them private whenever there is an existing function in RDD that can directly creat them (e.g. CoalescedRDD and rdd.coalesce()). One trade-off here is users can't subclass them. - Noted that compression and serialization formats don't have to be wire compatible across versions. - Compression codecs and serialization formats are semi-private as users typically don't instantiate them directly. - Metrics sources are made private - user only interacts with them through Spark's reflection Author: Patrick Wendell <pwendell@gmail.com> Author: Andrew Or <andrewor14@gmail.com> Closes #274 from pwendell/private-apis and squashes the following commits: 44179e4 [Patrick Wendell] Merge remote-tracking branch 'apache-github/master' into private-apis 042c803 [Patrick Wendell] spark.annotations -> spark.annotation bfe7b52 [Patrick Wendell] Adding experimental for approximate counts 8d0c873 [Patrick Wendell] Warning in SparkEnv 99b223a [Patrick Wendell] Cleaning up annotations e849f64 [Patrick Wendell] Merge pull request #2 from andrewor14/annotations 982a473 [Andrew Or] Generalize jQuery matching for non Spark-core API docs a01c076 [Patrick Wendell] Merge pull request #1 from andrewor14/annotations c1bcb41 [Andrew Or] DeveloperAPI -> DeveloperApi 0d48908 [Andrew Or] Comments and new lines (minor) f3954e0 [Andrew Or] Add identifier tags in comments to work around scaladocs bug 99192ef [Andrew Or] Dynamically add badges based on annotations 824011b [Andrew Or] Add support for injecting arbitrary JavaScript to API docs 037755c [Patrick Wendell] Some changes after working with andrew or f7d124f [Patrick Wendell] Small fixes c318b24 [Patrick Wendell] Use CSS styles e4c76b9 [Patrick Wendell] Logging f390b13 [Patrick Wendell] Better visibility for workaround constructors d6b0afd [Patrick Wendell] Small chang to existing constructor 403ba52 [Patrick Wendell] Style fix 870a7ba [Patrick Wendell] Work around for SI-8479 7fb13b2 [Patrick Wendell] Changes to UnionRDD and EmptyRDD 4a9e90c [Patrick Wendell] EXPERIMENTAL API --> EXPERIMENTAL c581dce [Patrick Wendell] Changes after building against Shark. 8452309 [Patrick Wendell] Style fixes 1ed27d2 [Patrick Wendell] Formatting and coloring of badges cd7a465 [Patrick Wendell] Code review feedback 2f706f1 [Patrick Wendell] Don't use floats 542a736 [Patrick Wendell] Small fixes cf23ec6 [Patrick Wendell] Marking GraphX as alpha d86818e [Patrick Wendell] Another naming change 5a76ed6 [Patrick Wendell] More visiblity clean-up 42c1f09 [Patrick Wendell] Using better labels 9d48cbf [Patrick Wendell] Initial pass
Diffstat (limited to 'tools')
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala103
1 files changed, 103 insertions, 0 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
new file mode 100644
index 0000000000..8e8c35615a
--- /dev/null
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.tools
+
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.SparkContext
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.Utils
+
+/**
+ * Internal utility for micro-benchmarking shuffle write performance.
+ *
+ * Writes simulated shuffle output from several threads and records the observed throughput.
+ */
+object StoragePerfTester {
+ def main(args: Array[String]) = {
+ /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
+ val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
+
+ /** Number of map tasks. All tasks execute concurrently. */
+ val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
+
+ /** Number of reduce splits for each map task. */
+ val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
+
+ val recordLength = 1000 // ~1KB records
+ val totalRecords = dataSizeMb * 1000
+ val recordsPerMap = totalRecords / numMaps
+
+ val writeData = "1" * recordLength
+ val executor = Executors.newFixedThreadPool(numMaps)
+
+ System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.shuffle.sync", "true")
+
+ // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
+ val sc = new SparkContext("local[4]", "Write Tester")
+ val blockManager = sc.env.blockManager
+
+ def writeOutputBytes(mapId: Int, total: AtomicLong) = {
+ val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
+ new KryoSerializer(sc.conf))
+ val writers = shuffle.writers
+ for (i <- 1 to recordsPerMap) {
+ writers(i % numOutputSplits).write(writeData)
+ }
+ writers.map {w =>
+ w.commit()
+ total.addAndGet(w.fileSegment().length)
+ w.close()
+ }
+
+ shuffle.releaseWriters(true)
+ }
+
+ val start = System.currentTimeMillis()
+ val latch = new CountDownLatch(numMaps)
+ val totalBytes = new AtomicLong()
+ for (task <- 1 to numMaps) {
+ executor.submit(new Runnable() {
+ override def run() = {
+ try {
+ writeOutputBytes(task, totalBytes)
+ latch.countDown()
+ } catch {
+ case e: Exception =>
+ println("Exception in child thread: " + e + " " + e.getMessage)
+ System.exit(1)
+ }
+ }
+ })
+ }
+ latch.await()
+ val end = System.currentTimeMillis()
+ val time = (end - start) / 1000.0
+ val bytesPerSecond = totalBytes.get() / time
+ val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
+
+ System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
+ System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
+ System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
+
+ executor.shutdown()
+ sc.stop()
+ }
+}