aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-13241][WEB UI] Added long values for dates in ApplicationAttemptInfo APIAlex Bozarth2016-04-018-1/+87
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Adding long values for each Date in the ApplicationAttemptInfo API for easier use in code ## How was the this patch tested? Tested with dev/run-tests Author: Alex Bozarth <ajbozart@us.ibm.com> Closes #11326 from ajbozarth/spark13241.
* [SPARK-13992] Add support for off-heap cachingJosh Rosen2016-04-017-48/+86
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This patch adds support for caching blocks in the executor processes using direct / off-heap memory. ## User-facing changes **Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` storage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if `serialized == true` and can be used to construct custom storage levels which support replication. **Storage UI reporting**: the storage UI will now report whether in-memory blocks are stored on- or off-heap. **Only supported by UnifiedMemoryManager**: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction. **Memory management policies:** the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap storage memory obeys `spark.memory.storageFraction` in order to control the amount of unevictable storage memory. For example, if `spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default `storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can split `spark.memory.storageFraction` into separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes. **Use of off-heap memory does not imply use of off-heap execution (or vice-versa)**: for now, the settings controlling the use of off-heap execution memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so that `spark.memory.offHeap.enabled` affect the default storage level for cached SQL tables. ## Internal changes - Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream` - It now returns a `ChunkedByteBuffer` instead of an array of byte arrays. - Its constructor now accept an `allocator` function which is called to allocate `ByteBuffer`s. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers. - Because block serialization is now performed during the unroll process, a `ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` allocator will use off-heap memory for both unroll and storage memory. - The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or off-heap. - `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa). - Make sure that off-heap buffers are properly de-allocated during MemoryStore eviction. - The JVM limits the total size of allocated direct byte buffers using the `-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11805 from JoshRosen/off-heap-caching.
* [SPARK-12343][YARN] Simplify Yarn client and client argumentjerryshao2016-04-011-9/+10
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments. ## How was this patch tested? This patch is tested manually with unit test. CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set. Author: jerryshao <sshao@hortonworks.com> Closes #11603 from jerryshao/SPARK-12343.
* [SPARK-11327][MESOS] Dispatcher does not respect all args from the Submit ↵Jo Voordeckers2016-03-311-0/+36
| | | | | | | | | | | request Supersedes https://github.com/apache/spark/pull/9752 Author: Jo Voordeckers <jo.voordeckers@gmail.com> Author: Iulian Dragos <jaguarul@gmail.com> Closes #10370 from jayv/mesos_cluster_params.
* [SPARK-14243][CORE] update task metrics when removing blocksjeanlyn2016-03-311-0/+10
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR try to use `incUpdatedBlockStatuses ` to update the `updatedBlockStatuses ` when removing blocks, making sure `BlockManager` correctly updates `updatedBlockStatuses` ## How was this patch tested? test("updated block statuses") in BlockManagerSuite.scala Author: jeanlyn <jeanlyn92@gmail.com> Closes #12091 from jeanlyn/updateBlock.
* [SPARK-13845][CORE] Using onBlockUpdated to replace onTaskEnd avioding ↵jeanlyn2016-03-285-73/+71
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | driver OOM ## What changes were proposed in this pull request? We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM ``` num #instances #bytes class name ---------------------------------------------- 1: 13845916 553836640 org.apache.spark.storage.BlockStatus 2: 14020324 336487776 org.apache.spark.storage.StreamBlockId 3: 13883881 333213144 scala.collection.mutable.DefaultEntry 4: 8907 89043952 [Lscala.collection.mutable.HashEntry; 5: 62360 65107352 [B 6: 163368 24453904 [Ljava.lang.Object; 7: 293651 20342664 [C ... ``` `BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end. After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`. In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time. ## How was this patch tested? Existing unit tests and manual tests Author: jeanlyn <jeanlyn92@gmail.com> Closes #11779 from jeanlyn/fix_driver_oom.
* [SPARK-14169][CORE] Add UninterruptibleThreadShixiong Zhu2016-03-281-0/+159
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Extract the workaround for HADOOP-10622 introduced by #11940 into UninterruptibleThread so that we can test and reuse it. ## How was this patch tested? Unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11971 from zsxwing/uninterrupt.
* [SPARK-14052] [SQL] build a BytesToBytesMap directly in HashedRelationDavies Liu2016-03-281-13/+51
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently, for the key that can not fit within a long, we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency. In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K, Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key. ## How was this patch tested? Existing tests. Added benchmark for broadcast hash join with duplicated keys. Author: Davies Liu <davies@databricks.com> Closes #11870 from davies/map2.
* [SPARK-13742] [CORE] Add non-iterator interface to RandomSamplerLiang-Chi Hsieh2016-03-282-0/+199
| | | | | | | | | | | | | | | | | | | | | | | | | | JIRA: https://issues.apache.org/jira/browse/SPARK-13742 ## What changes were proposed in this pull request? `RandomSampler.sample` currently accepts iterator as input and output another iterator. This makes it inappropriate to use in wholestage codegen of `Sampler` operator #11517. This change is to add non-iterator interface to `RandomSampler`. This change adds a new method `def sample(): Int` to the trait `RandomSampler`. As we don't need to know the actual values of the sampling items, so this new method takes no arguments. This method will decide whether to sample the next item or not. It returns how many times the next item will be sampled. For `BernoulliSampler` and `BernoulliCellSampler`, the returned sampling times can only be 0 or 1. It simply means whether to sample the next item or not. For `PoissonSampler`, the returned value can be more than 1, meaning the next item will be sampled multiple times. ## How was this patch tested? Tests are added into `RandomSamplerSuite`. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Liang-Chi Hsieh <viirya@appier.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #11578 from viirya/random-sampler-no-iterator.
* [SPARK-14135] Add off-heap storage memory bookkeeping support to MemoryManagerJosh Rosen2016-03-265-72/+88
| | | | | | | | | | This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap. In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation). Author: Josh Rosen <joshrosen@databricks.com> Closes #11942 from JoshRosen/off-heap-storage-memory-bookkeeping.
* [SPARK-13980] Incrementally serialize blocks while unrolling them in MemoryStoreJosh Rosen2016-03-242-34/+144
| | | | | | | | | | | | When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements. Instead, I think that we should incrementally serialize blocks while unrolling them. A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case. Author: Josh Rosen <joshrosen@databricks.com> Closes #11791 from JoshRosen/serialize-incrementally.
* Revert "[SPARK-2208] Fix for local metrics tests can fail on fast machines". ↵Sean Owen2016-03-241-25/+15
| | | | | | The test appears to still be flaky after this change, or more flaky. This reverts commit 5519760e0fe7d52170b38a52ce3d670d158e2aba.
* [SPARK-2208] Fix for local metrics tests can fail on fast machinesJoan2016-03-241-15/+25
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A fix for local metrics tests that can fail on fast machines. This is probably what is suggested here #3380 by aarondav? ## How was this patch tested? CI Tests Cheers Author: Joan <joan@goyeau.com> Closes #11747 from joan38/SPARK-2208-Local-metrics-tests.
* [SPARK-14110][CORE] PipedRDD to print the command ran on non zero exitTejas Patil2016-03-241-5/+17
| | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In case of failure in subprocess launched in PipedRDD, the failure exception reads “Subprocess exited with status XXX”. Debugging this is not easy for users especially if there are multiple pipe() operations in the Spark application. Changes done: - Changed the exception message when non-zero exit code is seen - If the reader and writer threads see exception, simply logging the command ran. The current model is to propagate the exception "as is" so that upstream Spark logic will take the right action based on what the exception was (eg. for fetch failure, it needs to retry; but for some fatal exception, it will decide to fail the stage / job). So wrapping the exception with a generic exception will not work. Altering the exception message will keep that guarantee but that is ugly (plus not all exceptions might have a constructor for a string message) ## How was this patch tested? - Added a new test case - Ran all existing tests for PipedRDD Author: Tejas Patil <tejasp@fb.com> Closes #11927 from tejasapatil/SPARK-14110-piperdd-failure.
* [SPARK-14025][STREAMING][WEBUI] Fix streaming job descriptions on the event ↵Liwei Lin2016-03-231-9/+65
| | | | | | | | | | | | | | | | | | | | | | | timeline ## What changes were proposed in this pull request? Removed the extra `<a href=...>...</a>` for each streaming job's description on the event timeline. ### [Before] ![before](https://cloud.githubusercontent.com/assets/15843379/13898653/0a6c1838-ee13-11e5-9761-14bb7b114c13.png) ### [After] ![after](https://cloud.githubusercontent.com/assets/15843379/13898650/012b8808-ee13-11e5-92a6-64aff0799c83.png) ## How was this patch tested? test suits, manual checks (see screenshots above) Author: Liwei Lin <proflin.me@gmail.com> Author: proflin <proflin.me@gmail.com> Closes #11845 from lw-lin/description-event-line.
* [SPARK-14075] Refactor MemoryStore to be testable independent of BlockManagerJosh Rosen2016-03-237-252/+333
| | | | | | | | | | | | | This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`. - The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`. - `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`. - The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests. - Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
* [SPARK-13990] Automatically pick serializer when caching RDDsJosh Rosen2016-03-214-28/+39
| | | | | | | | | | | | Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks. When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager. There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11801 from JoshRosen/pick-best-serializer-for-caching.
* [SPARK-14011][CORE][SQL] Enable `LineLength` Java checkstyle ruleDongjoon Hyun2016-03-214-238/+245
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? [Spark Coding Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) has 100-character limit on lines, but it's disabled for Java since 11/09/15. This PR enables **LineLength** checkstyle again. To help that, this also introduces **RedundantImport** and **RedundantModifier**, too. The following is the diff on `checkstyle.xml`. ```xml - <!-- TODO: 11/09/15 disabled - the lengths are currently > 100 in many places --> - <!-- <module name="LineLength"> <property name="max" value="100"/> <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/> </module> - --> <module name="NoLineWrap"/> <module name="EmptyBlock"> <property name="option" value="TEXT"/> -167,5 +164,7 </module> <module name="CommentsIndentation"/> <module name="UnusedImports"/> + <module name="RedundantImport"/> + <module name="RedundantModifier"/> ``` ## How was this patch tested? Currently, `lint-java` is disabled in Jenkins. It needs a manual test. After passing the Jenkins tests, `dev/lint-java` should passes locally. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11831 from dongjoon-hyun/SPARK-14011.
* [SPARK-13921] Store serialized blocks as multiple chunks in MemoryStoreJosh Rosen2016-03-173-15/+111
| | | | | | | | | | | | This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer. This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted. This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet). Author: Josh Rosen <joshrosen@databricks.com> Closes #11748 from JoshRosen/chunked-block-serialization.
* [SPARK-13928] Move org.apache.spark.Logging into ↵Wenchen Fan2016-03-1716-7/+25
| | | | | | | | | | | | | | | | org.apache.spark.internal.Logging ## What changes were proposed in this pull request? Logging was made private in Spark 2.0. If we move it, then users would be able to create a Logging trait themselves to avoid changing their own code. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #11764 from cloud-fan/logger.
* [SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with ↵Josh Rosen2016-03-165-24/+24
| | | | | | | | | | | | | | simple types Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo. This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers. In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places). Author: Josh Rosen <joshrosen@databricks.com> Closes #11755 from JoshRosen/automatically-pick-best-serializer.
* [SPARK-13281][CORE] Switch broadcast of RDD to exception from warningWesley Tang2016-03-161-0/+7
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? In SparkContext, throw Illegalargumentexception when trying to broadcast rdd directly, instead of logging the warning. ## How was this patch tested? mvn clean install Add UT in BroadcastSuite Author: Wesley Tang <tangmingjun@mininglamp.com> Closes #11735 from breakdawn/master.
* [SPARK-13793][CORE] PipedRDD doesn't propagate exceptions while reading ↵Tejas Patil2016-03-161-0/+21
| | | | | | | | | | | | | | | | | | parent RDD ## What changes were proposed in this pull request? PipedRDD creates a child thread to read output of the parent stage and feed it to the pipe process. Used a variable to save the exception thrown in the child thread and then propagating the exception in the main thread if the variable was set. ## How was this patch tested? - Added a unit test - Ran all the existing tests in PipedRDDSuite and they all pass with the change - Tested the patch with a real pipe() job, bounced the executor node which ran the parent stage to simulate a fetch failure and observed that the parent stage was re-ran. Author: Tejas Patil <tejasp@fb.com> Closes #11628 from tejasapatil/pipe_rdd.
* [SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset ↵Sean Owen2016-03-1612-28/+30
| | | | | | | | | | | | | | | | | | | | follow up ## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11725 from srowen/SPARK-13823.2.
* [SPARK-10907][SPARK-6157] Remove pendingUnrollMemory from MemoryStoreJosh Rosen2016-03-141-43/+34
| | | | | | | | | | | | | | | | | This patch refactors the MemoryStore to remove the concept of `pendingUnrollMemory`. It also fixes fixes SPARK-6157: "Unrolling with MEMORY_AND_DISK should always release memory". Key changes: - Inline `MemoryStore.tryToPut` at its three call sites in the `MemoryStore`. - Inline `Memory.unrollSafely` at its only call site (in `MemoryStore.putIterator`). - Inline `MemoryManager.acquireStorageMemory` at its call sites. - Simplify the code as a result of this inlining (some parameters have fixed values after inlining, so lots of branches can be removed). - Remove the `pendingUnrollMemory` map by returning the amount of unrollMemory allocated when returning an iterator after a failed `putIterator` call. - Change `putIterator` to return an instance of `PartiallyUnrolledIterator`, a special iterator subclass which will automatically free the unroll memory of its partially-unrolled elements when the iterator is consumed. To handle cases where the iterator is not consumed (e.g. when a MEMORY_ONLY put fails), `PartiallyUnrolledIterator` exposes a `close()` method which may be called to discard the unrolled values and free their memory. Author: Josh Rosen <joshrosen@databricks.com> Closes #11613 from JoshRosen/cleanup-unroll-memory.
* [SPARK-13054] Always post TaskEnd event for tasksThomas Graves2016-03-141-0/+58
| | | | | | | | | | | | I am using dynamic container allocation and speculation and am seeing issues with the active task accounting. The Executor UI still shows active tasks on the an executor but the job/stage is all completed. I think its also affecting the dynamic allocation being able to release containers because it thinks there are still tasks. There are multiple issues with this: - If the task end for tasks (in this case probably because of speculation) comes in after the stage is finished, then the DAGScheduler.handleTaskCompletion will skip the task completion event Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com> Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com> Author: Tom Graves <tgraves@yahoo-inc.com> Closes #10951 from tgravescs/SPARK-11701.
* [SPARK-12583][MESOS] Mesos shuffle service: Don't delete shuffle files ↵Bertrand Bossy2016-03-141-1/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | before application has stopped ## Problem description: Mesos shuffle service is completely unusable since Spark 1.6.0 . The problem seems to occur since the move from akka to netty in the networking layer. Until now, a connection from the driver to each shuffle service was used as a signal for the shuffle service to determine, whether the driver is still running. Since 1.6.0, this connection is closed after spark.shuffle.io.connectionTimeout (or spark.network.timeout if the former is not set) due to it being idle. The shuffle service interprets this as a signal that the driver has stopped, despite the driver still being alive. Thus, shuffle files are deleted before the application has stopped. ### Context and analysis: spark shuffle fails with mesos after 2mins: https://issues.apache.org/jira/browse/SPARK-12583 External shuffle service broken w/ Mesos: https://issues.apache.org/jira/browse/SPARK-13159 This is a follow up on #11207 . ## What changes were proposed in this pull request? This PR adds a heartbeat signal from the Driver (in MesosExternalShuffleClient) to all registered external mesos shuffle service instances. In MesosExternalShuffleBlockHandler, a thread periodically checks whether a driver has timed out and cleans an application's shuffle files if this is the case. ## How was the this patch tested? This patch has been tested on a small mesos test cluster using the spark-shell. Log output from mesos shuffle service: ``` 16/02/19 15:13:45 INFO mesos.MesosExternalShuffleBlockHandler: Received registration request from app 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 (remote address /xxx.xxx.xxx.xxx:52391, heartbeat timeout 120000 ms). 16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-c84c0697-a3f9-4f61-9c64-4d3ee227c047], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-bf46497a-de80-47b9-88f9-563123b59e03], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/19 15:16:02 INFO mesos.MesosExternalShuffleBlockHandler: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 timed out. Removing shuffle files. 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 removed, cleanupLocalDirs = true 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3}'s 1 local dirs 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7}'s 1 local dirs ``` Note: there are 2 executors running on this slave. Author: Bertrand Bossy <bertrand.bossy@teralytics.net> Closes #11272 from bbossy/SPARK-12583-mesos-shuffle-service-heartbeat.
* [SPARK-13746][TESTS] stop using deprecated SynchronizedSetWilson Wu2016-03-141-16/+25
| | | | | | | | trait SynchronizedSet in package mutable is deprecated Author: Wilson Wu <wilson888888888@gmail.com> Closes #11580 from wilson888888888/spark-synchronizedset.
* [MINOR][DOCS] Fix more typos in comments/strings.Dongjoon Hyun2016-03-1411-16/+16
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR fixes 135 typos over 107 files: * 121 typos in comments * 11 typos in testcase name * 3 typos in log messages ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11689 from dongjoon-hyun/fix_more_typos.
* [SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> ↵Sean Owen2016-03-1313-47/+53
| | | | | | | | | | | | | | | | | | | | byte[] conversions (and remaining Coverity items) ## What changes were proposed in this pull request? - Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8 - Same for `InputStreamReader` and `OutputStreamWriter` constructors - Standardizes on UTF-8 everywhere - Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`) - (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c ) ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11657 from srowen/SPARK-13823.
* [SPARK-13328][CORE] Poor read performance for broadcast variables with ↵Nezih Yigitbasi2016-03-111-3/+81
| | | | | | | | | | dynamic resource allocation When dynamic resource allocation is enabled fetching broadcast variables from removed executors were causing job failures and SPARK-9591 fixed this problem by trying all locations of a block before giving up. However, the locations of a block is retrieved only once from the driver in this process and the locations in this list can be stale due to dynamic resource allocation. This situation gets worse when running on a large cluster as the size of this location list can be in the order of several hundreds out of which there may be tens of stale entries. What we have observed is with the default settings of 3 max retries and 5s between retries (that's 15s per location) the time it takes to read a broadcast variable can be as high as ~17m (70 failed attempts * 15s/attempt) Author: Nezih Yigitbasi <nyigitbasi@netflix.com> Closes #11241 from nezihyigitbasi/SPARK-13328.
* [SPARK-13604][CORE] Sync worker's state after registering with masterShixiong Zhu2016-03-101-5/+51
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Here lists all cases that Master cannot talk with Worker for a while and then network is back. 1. Master doesn't know the network issue (not yet timeout) a. Worker doesn't know the network issue (onDisconnected is not called) - Worker keeps sending Heartbeat. Both Worker and Master don't know the network issue. Nothing to do. (Finally, Master will notice the heartbeat timeout if network is not recovered) b. Worker knows the network issue (onDisconnected is called) - Worker stops sending Heartbeat and sends `RegisterWorker` to master. Master will reply `RegisterWorkerFailed("Duplicate worker ID")`. Worker calls "System.exit(1)" (Finally, Master will notice the heartbeat timeout if network is not recovered) (May leak driver processes. See [SPARK-13602](https://issues.apache.org/jira/browse/SPARK-13602)) 2. Worker timeout (Master knows the network issue). In such case, master removes Worker and its executors and drivers. a. Worker doesn't know the network issue (onDisconnected is not called) - Worker keeps sending Heartbeat. - If the network is back, say Master receives Heartbeat, Master sends `ReconnectWorker` to Worker - Worker send `RegisterWorker` to master. - Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors) b. Worker knows the network issue (onDisconnected is called) - Worker stop sending `Heartbeat`. Worker will send "RegisterWorker" to master. - Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors) This PR fixes executors and drivers leak in 2.a and 2.b when Worker reregisters with Master. The approach is making Worker send `WorkerLatestState` to sync the state after registering with master successfully. Then Master will ask Worker to kill unknown executors and drivers. Note: Worker cannot just kill executors after registering with master because in the worker, `LaunchExecutor` and `RegisteredWorker` are processed in two threads. If `LaunchExecutor` happens before `RegisteredWorker`, Worker's executor list will contain new executors after Master accepts `RegisterWorker`. We should not kill these executors. So sending the list to Master and let Master tell Worker which executors should be killed. ## How was this patch tested? test("SPARK-13604: Master should ask Worker kill unknown executors and drivers") Author: Shixiong Zhu <shixiong@databricks.com> Closes #11455 from zsxwing/orphan-executors.
* [SPARK-3854][BUILD] Scala style: require spaces before `{`.Dongjoon Hyun2016-03-103-3/+3
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Since the opening curly brace, '{', has many usages as discussed in [SPARK-3854](https://issues.apache.org/jira/browse/SPARK-3854), this PR adds a ScalaStyle rule to prevent '){' pattern for the following majority pattern and fixes the code accordingly. If we enforce this in ScalaStyle from now, it will improve the Scala code quality and reduce review time. ``` // Correct: if (true) { println("Wow!") } // Incorrect: if (true){ println("Wow!") } ``` IntelliJ also shows new warnings based on this. ## How was this patch tested? Pass the Jenkins ScalaStyle test. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11637 from dongjoon-hyun/SPARK-3854.
* [SPARK-13696] Remove BlockStore class & simplify interfaces of mem. & disk ↵Josh Rosen2016-03-108-68/+87
| | | | | | | | | | | | | | stores Today, both the MemoryStore and DiskStore implement a common `BlockStore` API, but I feel that this API is inappropriate because it abstracts away important distinctions between the behavior of these two stores. For instance, the disk store doesn't have a notion of storing deserialized objects, so it's confusing for it to expose object-based APIs like putIterator() and getValues() instead of only exposing binary APIs and pushing the responsibilities of serialization and deserialization to the client. Similarly, the DiskStore put() methods accepted a `StorageLevel` parameter even though the disk store can only store blocks in one form. As part of a larger BlockManager interface cleanup, this patch remove the BlockStore interface and refines the MemoryStore and DiskStore interfaces to reflect more narrow sets of responsibilities for those components. Some of the benefits of this interface cleanup are reflected in simplifications to several unit tests to eliminate now-unnecessary mocking, significant simplification of the BlockManager's `getLocal()` and `doPut()` methods, and a narrower API between the MemoryStore and DiskStore. Author: Josh Rosen <joshrosen@databricks.com> Closes #11534 from JoshRosen/remove-blockstore-interface.
* [SPARK-13727][CORE] SparkConf.contains does not consider deprecated keysbomeng2016-03-101-0/+14
| | | | | | | | | | | | | | | | | | | | | | The contains() method does not return consistently with get() if the key is deprecated. For example, import org.apache.spark.SparkConf val conf = new SparkConf() conf.set("spark.io.compression.lz4.block.size", "12345") # display some deprecated warning message conf.get("spark.io.compression.lz4.block.size") # return 12345 conf.get("spark.io.compression.lz4.blockSize") # return 12345 conf.contains("spark.io.compression.lz4.block.size") # return true conf.contains("spark.io.compression.lz4.blockSize") # return false The fix will make the contains() and get() more consistent. I've added a test case for this. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Unit tests should be sufficient. Author: bomeng <bmeng@us.ibm.com> Closes #11568 from bomeng/SPARK-13727.
* [SPARK-13492][MESOS] Configurable Mesos framework webui URL.Sergiusz Urbaniak2016-03-092-1/+66
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Previously the Mesos framework webui URL was being derived only from the Spark UI address leaving no possibility to configure it. This commit makes it configurable. If unset it falls back to the previous behavior. Motivation: This change is necessary in order to be able to install Spark on DCOS and to be able to give it a custom service link. The configured `webui_url` is configured to point to a reverse proxy in the DCOS environment. ## How was this patch tested? Locally, using unit tests and on DCOS testing and stable revision. Author: Sergiusz Urbaniak <sur@mesosphere.io> Closes #11369 from s-urbaniak/sur-webui-url.
* [SPARK-13702][CORE][SQL][MLLIB] Use diamond operator for generic instance ↵Dongjoon Hyun2016-03-095-17/+14
| | | | | | | | | | | | | | | | | | | | | | | | | creation in Java code. ## What changes were proposed in this pull request? In order to make `docs/examples` (and other related code) more simple/readable/user-friendly, this PR replaces existing codes like the followings by using `diamond` operator. ``` - final ArrayList<Product2<Object, Object>> dataToWrite = - new ArrayList<Product2<Object, Object>>(); + final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); ``` Java 7 or higher supports **diamond** operator which replaces the type arguments required to invoke the constructor of a generic class with an empty set of type parameters (<>). Currently, Spark Java code use mixed usage of this. ## How was this patch tested? Manual. Pass the existing tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11541 from dongjoon-hyun/SPARK-13702.
* [SPARK-13692][CORE][SQL] Fix trivial Coverity/Checkstyle defectsDongjoon Hyun2016-03-091-2/+2
| | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This issue fixes the following potential bugs and Java coding style detected by Coverity and Checkstyle. - Implement both null and type checking in equals functions. - Fix wrong type casting logic in SimpleJavaBean2.equals. - Add `implement Cloneable` to `UTF8String` and `SortedIterator`. - Remove dereferencing before null check in `AbstractBytesToBytesMapSuite`. - Fix coding style: Add '{}' to single `for` statement in mllib examples. - Remove unused imports in `ColumnarBatch` and `JavaKinesisStreamSuite`. - Remove unused fields in `ChunkFetchIntegrationSuite`. - Add `stop()` to prevent resource leak. Please note that the last two checkstyle errors exist on newly added commits after [SPARK-13583](https://issues.apache.org/jira/browse/SPARK-13583). ## How was this patch tested? manual via `./dev/lint-java` and Coverity site. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11530 from dongjoon-hyun/SPARK-13692.
* [SPARK-13695] Don't cache MEMORY_AND_DISK blocks as bytes in memory after spillsJosh Rosen2016-03-081-10/+20
| | | | | | | | | | | | | | | | | When a cached block is spilled to disk and read back in serialized form (i.e. as bytes), the current BlockManager implementation will attempt to re-insert the serialized block into the MemoryStore even if the block's storage level requests deserialized caching. This behavior adds some complexity to the MemoryStore but I don't think it offers many performance benefits and I'd like to remove it in order to simplify a larger refactoring patch. Therefore, this patch changes the behavior so that disk store reads will only cache bytes in the memory store for blocks with serialized storage levels. There are two places where we request serialized bytes from the BlockStore: 1. getLocalBytes(), which is only called when reading local copies of TorrentBroadcast pieces. Broadcast pieces are always cached using a serialized storage level, so this won't lead to a mismatch in serialization forms if spilled bytes read from disk are cached as bytes in the memory store. 2. the non-shuffle-block branch in getBlockData(), which is only called by the NettyBlockRpcServer when responding to requests to read remote blocks. Caching the serialized bytes in memory will only benefit us if those cached bytes are read before they're evicted and the likelihood of that happening seems low since the frequency of remote reads of non-broadcast cached blocks seems very low. Caching these bytes when they have a low probability of being read is bad if it risks the eviction of blocks which are cached in their expected serialized/deserialized forms, since those blocks seem more likely to be read in local computation. Given the argument above, I think this change is unlikely to cause performance regressions. Author: Josh Rosen <joshrosen@databricks.com> Closes #11533 from JoshRosen/remove-memorystore-level-mismatch.
* [SPARK-13659] Refactor BlockStore put*() APIs to remove returnValuesJosh Rosen2016-03-071-30/+22
| | | | | | | | | | | | | | In preparation for larger refactoring, this patch removes the confusing `returnValues` option from the BlockStore put() APIs: returning the value is only useful in one place (caching) and in other situations, such as block replication, it's simpler to put() and then get(). As part of this change, I needed to refactor `BlockManager.doPut()`'s block replication code. I also changed `doPut()` to access the memory and disk stores directly rather than calling them through the BlockStore interface; this is in anticipation of a followup patch to remove the BlockStore interface so that the disk store can expose a binary-data-oriented API which is not concerned with Java objects or serialization. These changes should be covered by the existing storage unit tests. The best way to review this patch is probably to look at the individual commits, all of which are small and have useful descriptions to guide the review. /cc davies for review. Author: Josh Rosen <joshrosen@databricks.com> Closes #11502 from JoshRosen/remove-returnvalues.
* [SPARK-529][CORE][YARN] Add type-safe config keys to SparkConf.Marcelo Vanzin2016-03-071-0/+155
| | | | | | | | | | | | | | | | This is, in a way, the basics to enable SPARK-529 (which was closed as won't fix but I think is still valuable). In fact, Spark SQL created something for that, and this change basically factors out that code and inserts it into SparkConf, with some extra bells and whistles. To showcase the usage of this pattern, I modified the YARN backend to use the new config keys (defined in the new `config` package object under `o.a.s.deploy.yarn`). Most of the changes are mechanic, although logic had to be slightly modified in a handful of places. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #10205 from vanzin/conf-opts.
* [MINOR] Fix typos in comments and testcase name of codeDongjoon Hyun2016-03-031-2/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR fixes typos in comments and testcase name of code. ## How was this patch tested? manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11481 from dongjoon-hyun/minor_fix_typos_in_code.
* [SPARK-13423][HOTFIX] Static analysis fixes for 2.x / fixed for Scala 2.10, ↵Sean Owen2016-03-031-1/+1
| | | | | | | | | | | | | | | | again ## What changes were proposed in this pull request? Fixes (another) compile problem due to inadvertent use of Option.contains, only in Scala 2.11 ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11496 from srowen/SPARK-13423.3.
* [SPARK-13583][CORE][STREAMING] Remove unused imports and add checkstyle ruleDongjoon Hyun2016-03-0317-30/+10
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? After SPARK-6990, `dev/lint-java` keeps Java code healthy and helps PR review by saving much time. This issue aims remove unused imports from Java/Scala code and add `UnusedImports` checkstyle rule to help developers. ## How was this patch tested? ``` ./dev/lint-java ./build/sbt compile ``` Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11438 from dongjoon-hyun/SPARK-13583.
* [SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.xSean Owen2016-03-0313-51/+50
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen <sowen@cloudera.com> Closes #11292 from srowen/SPARK-13423.
* [SPARK-13621][CORE] TestExecutor.scala needs to be moved to test packageDevaraj K2016-03-021-0/+29
| | | | | | | | Moved TestExecutor.scala from src to test package and removed the unused file TestClient.scala. Author: Devaraj K <devaraj@apache.org> Closes #11474 from devaraj-kavali/SPARK-13621.
* [SPARK-13601] call failure callbacks before writer.close()Davies Liu2016-03-021-1/+90
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? In order to tell OutputStream that the task has failed or not, we should call the failure callbacks BEFORE calling writer.close(). ## How was this patch tested? Added new unit tests. Author: Davies Liu <davies@databricks.com> Closes #11450 from davies/callback.
* [SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManagerJosh Rosen2016-03-024-239/+171
| | | | | | | | | | | | | | CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication. Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method. This pull request replaces / subsumes #10748. /cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods. Author: Josh Rosen <joshrosen@databricks.com> Closes #11436 from JoshRosen/remove-cachemanager.
* [SPARK-12994][CORE] It is not necessary to create ExecutorAllocationM…Jeff Zhang2016-02-291-0/+3
| | | | | | | | …anager in local mode Author: Jeff Zhang <zjffdu@apache.org> Closes #10914 from zjffdu/SPARK-12994.
* [SPARK-13465] Add a task failure listener to TaskContextReynold Xin2016-02-263-40/+73
| | | | | | | | | | | | | ## What changes were proposed in this pull request? TaskContext supports task completion callback, which gets called regardless of task failures. However, there is no way for the listener to know if there is an error. This patch adds a new listener that gets called when a task fails. ## How was the this patch tested? New unit test case and integration test case covering the code path Author: Reynold Xin <rxin@databricks.com> Closes #11340 from rxin/SPARK-13465.