aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC ↵Dhruve Ashar2016-09-221-2/+3
| | | | | | | | | | | | | | | call time. ## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar <dashar@yahoo-inc.com> Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15152 from dhruve/impr/SPARK-17365.
* [SPARK-4563][CORE] Allow driver to advertise a different network address.Marcelo Vanzin2016-09-212-2/+4
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | The goal of this feature is to allow the Spark driver to run in an isolated environment, such as a docker container, and be able to use the host's port forwarding mechanism to be able to accept connections from the outside world. The change is restricted to the driver: there is no support for achieving the same thing on executors (or the YARN AM for that matter). Those still need full access to the outside world so that, for example, connections can be made to an executor's block manager. The core of the change is simple: add a new configuration that tells what's the address the driver should bind to, which can be different than the address it advertises to executors (spark.driver.host). Everything else is plumbing the new configuration where it's needed. To use the feature, the host starting the container needs to set up the driver's port range to fall into a range that is being forwarded; this required the block manager port to need a special configuration just for the driver, which falls back to the existing spark.blockManager.port when not set. This way, users can modify the driver settings without affecting the executors; it would theoretically be nice to also have different retry counts for driver and executors, but given that docker (at least) allows forwarding port ranges, we can probably live without that for now. Because of the nature of the feature it's kinda hard to add unit tests; I just added a simple one to make sure the configuration works. This was tested with a docker image running spark-shell with the following command: docker blah blah blah \ -p 38000-38100:38000-38100 \ [image] \ spark-shell \ --num-executors 3 \ --conf spark.shuffle.service.enabled=false \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.driver.host=[host's address] \ --conf spark.driver.port=38000 \ --conf spark.driver.blockManager.port=38020 \ --conf spark.ui.port=38040 Running on YARN; verified the driver works, executors start up and listen on ephemeral ports (instead of using the driver's config), and that caching and shuffling (without the shuffle service) works. Clicked through the UI to make sure all pages (including executor thread dumps) worked. Also tested apps without docker, and ran unit tests. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15120 from vanzin/SPARK-4563.
* [SPARK-17110] Fix StreamCorruptionException in BlockManager.getRemoteValues()Josh Rosen2016-09-061-1/+2
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException. We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long. This patch addresses the bug by modifying `BlockManager`'s `get()` and `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`) ## How was this patch tested? Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed. Author: Josh Rosen <joshrosen@databricks.com> Closes #14952 from JoshRosen/SPARK-17110.
* [SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceivedBatch'Xin Ren2016-08-171-0/+3
| | | | | | | | | | | | | | | | | | | | https://issues.apache.org/jira/browse/SPARK-17038 ## What changes were proposed in this pull request? StreamingSource's lastReceivedBatch_submissionTime, lastReceivedBatch_processingTimeStart, and lastReceivedBatch_processingTimeEnd all use data from lastCompletedBatch instead of lastReceivedBatch. In particular, this makes it impossible to match lastReceivedBatch_records with a batchID/submission time. This is apparent when looking at StreamingSource.scala, lines 89-94. ## How was this patch tested? Manually running unit tests on local laptop Author: Xin Ren <iamshrek@126.com> Closes #14681 from keypointt/SPARK-17038.
* [SPARK-16779][TRIVIAL] Avoid using postfix operators where they do not add ↵Holden Karau2016-08-081-1/+0
| | | | | | | | | | | | | | | | much and remove whitelisting ## What changes were proposed in this pull request? Avoid using postfix operation for command execution in SQLQuerySuite where it wasn't whitelisted and audit existing whitelistings removing postfix operators from most places. Some notable places where postfix operation remains is in the XML parsing & time units (seconds, millis, etc.) where it arguably can improve readability. ## How was this patch tested? Existing tests. Author: Holden Karau <holden@us.ibm.com> Closes #14407 from holdenk/SPARK-16779.
* [SPARK-16694][CORE] Use for/foreach rather than map for Unit expressions ↵Sean Owen2016-07-301-1/+1
| | | | | | | | | | | | | | | | whose side effects are required ## What changes were proposed in this pull request? Use foreach/for instead of map where operation requires execution of body, not actually defining a transformation ## How was this patch tested? Jenkins Author: Sean Owen <sowen@cloudera.com> Closes #14332 from srowen/SPARK-16694.
* [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size ↵Dhruve Ashar2016-07-261-1/+4
| | | | | | | | | | | | | | | | | configurable ## What changes were proposed in this pull request? This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000. Note: I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details. ## How was this patch tested? Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #14269 from dhruve/bug/SPARK-15703.
* [SPARK-16722][TESTS] Fix a StreamingContext leak in StreamingContextSuite ↵Shixiong Zhu2016-07-251-3/+6
| | | | | | | | | | | | | | | | when eventually fails ## What changes were proposed in this pull request? This PR moves `ssc.stop()` into `finally` for `StreamingContextSuite.createValidCheckpoint` to avoid leaking a StreamingContext since leaking a StreamingContext will fail a lot of tests and make us hard to find the real failure one. ## How was this patch tested? Jenkins unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #14354 from zsxwing/ssc-leak.
* [SPARK-16120][STREAMING] getCurrentLogFiles in ReceiverSuite WAL generating ↵Ahmed Mahran2016-06-221-1/+1
| | | | | | | | | | | | | | | | | | and cleaning case uses external variable instead of the passed parameter ## What changes were proposed in this pull request? In `ReceiverSuite.scala`, in the test case "write ahead log - generating and cleaning", the inner method `getCurrentLogFiles` uses external variable `logDirectory1` instead of the passed parameter `logDirectory`. This PR fixes this by using the passed method argument instead of variable from the outer scope. ## How was this patch tested? The unit test was re-run and the output logs were checked for the correct paths used. tdas Author: Ahmed Mahran <ahmed.mahran@mashin.io> Closes #13825 from ahmed-mahran/b-receiver-suite-wal-gen-cln.
* [SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator APISean Owen2016-06-121-3/+3
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? - Deprecate old Java accumulator API; should use Scala now - Update Java tests and examples - Don't bother testing old accumulator API in Java 8 (too) - (fix a misspelling too) ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #13606 from srowen/SPARK-15086.
* [MINOR] Fix Typos 'an -> a'Zheng RuiFeng2016-06-062-2/+2
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `an -> a` Use cmds like `find . -name '*.R' | xargs -i sh -c "grep -in ' an [^aeiou]' {} && echo {}"` to generate candidates, and review them one by one. ## How was this patch tested? manual tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #13515 from zhengruifeng/an_a.
* [SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcardmwws2016-05-111-0/+62
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? make StreamingContext.textFileStream support wildcard like /home/user/*/file ## How was this patch tested? I did manual test and added a new unit test case Author: mwws <wei.mao@intel.com> Author: unknown <maowei@maowei-MOBL.ccr.corp.intel.com> Closes #12752 from mwws/SPARK_FileStream.
* [MINOR][TEST][STREAMING] make "testDir" able to be claened after test.mwws2016-05-091-4/+4
| | | | | | | | | | | It's a minor bug in test case. `val testDir = null` will keep be `null` as it's immutable, so in finally block, nothing will be cleaned. Another `testDir` variable created in try block is only visible in try block. ## How was this patch tested? Run existing test case and passed. Author: mwws <wei.mao@intel.com> Closes #12999 from mwws/SPARK_MINOR.
* [SPARK-1239] Improve fetching of map output statusesThomas Graves2016-05-061-1/+3
| | | | | | | | | | | | | | | | | | | | | | | | | | | The main issue we are trying to solve is the memory bloat of the Driver when tasks request the map output statuses. This means with a large number of tasks you either need a huge amount of memory on Driver or you have to repartition to smaller number. This makes it really difficult to run over say 50000 tasks. The main issues that cause the memory bloat are: 1) no flow control on sending the map output status responses. We serialize the map status output and then hand off to netty to send. netty is sending asynchronously and it can't send them fast enough to keep up with incoming requests so we end up with lots of copies of the serialized map output statuses sitting there and this causes huge bloat when you have 10's of thousands of tasks and map output status is in the 10's of MB. 2) When initial reduce tasks are started up, they all request the map output statuses from the Driver. These requests are handled by multiple threads in parallel so even though we check to see if we have a cached version, initially when we don't have a cached version yet, many of initial requests can all end up serializing the exact same map output statuses. This patch does a couple of things: - When the map output status size is over a threshold (default 512K) then it uses broadcast to send the map statuses. This means we no longer serialize a large map output status and thus we don't have issues with memory bloat. the messages sizes are now in the 300-400 byte range and the map status output are broadcast. If its under the threadshold it sends it as before, the message contains the DIRECT indicator now. - synchronize the incoming requests to allow one thread to cache the serialized output and broadcast the map output status that can then be used by everyone else. This ensures we don't create multiple broadcast variables when we don't need to. To ensure this happens I added a second thread pool which the Dispatcher hands the requests to so that those threads can block without blocking the main dispatcher threads (which would cause things like heartbeats and such not to come through) Note that some of design and code was contributed by mridulm ## How was this patch tested? Unit tests and a lot of manually testing. Ran with akka and netty rpc. Ran with both dynamic allocation on and off. one of the large jobs I used to test this was a join of 15TB of data. it had 200,000 map tasks, and 20,000 reduce tasks. Executors ranged from 200 to 2000. This job ran successfully with 5GB of memory on the driver with these changes. Without these changes I was using 20GB and only had 500 reduce tasks. The job has 50mb of serialized map output statuses and took roughly the same amount of time for the executors to get the map output statuses as before. Ran a variety of other jobs, from large wordcounts to small ones not using broadcasts. Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com> Closes #12113 from tgravescs/SPARK-1239.
* Revert "[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore ↵Josh Rosen2016-04-191-26/+23
| | | | | | BlockManager put errors" This reverts commit ed2de0299a5a54b566b91ae9f47b6626c484c1d3.
* [SPARK-14676] Wrap and re-throw Await.result exceptions in order to capture ↵Josh Rosen2016-04-191-2/+3
| | | | | | | | | | | | | | | | full stacktrace When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread. This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`. I tested this manually using https://github.com/JoshRosen/spark/commit/16b31c825197ee31a50214c6ba3c1df08148f403, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR. /cc rxin nongli yhuai anabranch Author: Josh Rosen <joshrosen@databricks.com> Closes #12433 from JoshRosen/wrap-and-rethrow-await-exceptions.
* [SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put ↵Josh Rosen2016-04-181-23/+26
| | | | | | | | | | | | | | | | | | | errors WriteAheadLogBasedBlockHandler will currently throw exceptions if its BlockManager `put()` calls fail, even though those calls are only performed as a performance optimization. Instead, it should log and ignore exceptions during that `put()`. This is a longstanding issue that was masked by an incorrect test case. I think that we haven't noticed this in production because 1. most people probably use a `MEMORY_AND_DISK` storage level, and 2. typically, individual blocks may be small enough relative to the total storage memory such that they're able to evict blocks from previous batches, so `put()` failures here may be rare in practice. This patch fixes the faulty test and fixes the bug. /cc tdas Author: Josh Rosen <joshrosen@databricks.com> Closes #12484 from JoshRosen/received-block-hadndler-fix.
* [SPARK-14667] Remove HashShuffleManagerReynold Xin2016-04-181-2/+2
| | | | | | | | | | | | ## What changes were proposed in this pull request? The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager. ## How was this patch tested? Removed some tests related to the old manager. Author: Reynold Xin <rxin@databricks.com> Closes #12423 from rxin/SPARK-14667.
* [SPARK-14544] [SQL] improve performance of SQL UI tabDavies Liu2016-04-121-2/+2
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR improve the performance of SQL UI by: 1) remove the details column in all executions page (the first page in SQL tab). We can check the details by enter the execution page. 2) break-all is super slow in Chrome recently, so switch to break-word. 3) Using "display: none" to hide a block. 4) using one js closure for for all the executions, not one for each. 5) remove the height limitation of details, don't need to scroll it in the tiny window. ## How was this patch tested? Exists tests. ![ui](https://cloud.githubusercontent.com/assets/40902/14445712/68d7b258-0004-11e6-9b48-5d329b05d165.png) Author: Davies Liu <davies@databricks.com> Closes #12311 from davies/ui_perf.
* [SPARK-14508][BUILD] Add a new ScalaStyle Rule `OmitBracesInCase`Dongjoon Hyun2016-04-123-8/+4
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? According to the [Spark Code Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) and [Scala Style Guide](http://docs.scala-lang.org/style/control-structures.html#curlybraces), we had better enforce the following rule. ``` case: Always omit braces in case clauses. ``` This PR makes a new ScalaStyle rule, 'OmitBracesInCase', and enforces it to the code. ## How was this patch tested? Pass the Jenkins tests (including Scala style checking) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12280 from dongjoon-hyun/SPARK-14508.
* [SPARK-14475] Propagate user-defined context from driver to executorsEric Liang2016-04-111-1/+9
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This adds a new API call `TaskContext.getLocalProperty` for getting properties set in the driver from executors. These local properties are automatically propagated from the driver to executors. For streaming, the context for streaming tasks will be the initial driver context when ssc.start() is called. ## How was this patch tested? Unit tests. cc JoshRosen Author: Eric Liang <ekl@databricks.com> Closes #12248 from ericl/sc-2813.
* [SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in ↵jerryshao2016-04-091-1/+22
| | | | | | | | | | | | | | | | | | receiver-less scenario ## What changes were proposed in this pull request? When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, NPE will be thrown, since this `ReceiverTracker` actually is not started and `endpoint` is not created. This will be happened when playing streaming dynamic allocation with direct Kafka. ## How was this patch tested? Local integrated test is done. Author: jerryshao <sshao@hortonworks.com> Closes #12236 from jerryshao/SPARK-14455.
* [SPARK-14437][CORE] Use the address that NettyBlockTransferService listens ↵Shixiong Zhu2016-04-081-1/+1
| | | | | | | | | | | | | | | | | | | to create BlockManagerId ## What changes were proposed in this pull request? Here is why SPARK-14437 happens: BlockManagerId is created using NettyBlockTransferService.hostName which comes from `customHostname`. And `Executor` will set `customHostname` to the hostname which is detected by the driver. However, the driver may not be able to detect the correct address in some complicated network (Netty's Channel.remoteAddress doesn't always return a connectable address). In such case, `BlockManagerId` will be created using a wrong hostname. To fix this issue, this PR uses `hostname` provided by `SparkEnv.create` to create `NettyBlockTransferService` and set `NettyBlockTransferService.hostname` to this one directly. A bonus of this approach is NettyBlockTransferService won't bound to `0.0.0.0` which is much safer. ## How was this patch tested? Manually checked the bound address using local-cluster. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12240 from zsxwing/SPARK-14437.
* [SPARK-14134][CORE] Change the package name used for shading classes.Marcelo Vanzin2016-04-061-1/+1
| | | | | | | | | | | | | | | The current package name uses a dash, which is a little weird but seemed to work. That is, until a new test tried to mock a class that references one of those shaded types, and then things started failing. Most changes are just noise to fix the logging configs. For reference, SPARK-8815 also raised this issue, although at the time it did not cause any issues in Spark, so it was not addressed. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11941 from vanzin/SPARK-14134.
* [SPARK-12133][STREAMING] Streaming dynamic allocationTathagata Das2016-04-061-0/+395
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Added a new Executor Allocation Manager for the Streaming scheduler for doing Streaming Dynamic Allocation. ## How was this patch tested Unit tests, and cluster tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12154 from tdas/streaming-dynamic-allocation.
* [SPARK-13211][STREAMING] StreamingContext throws NoSuchElementException when ↵Sean Owen2016-04-051-0/+5
| | | | | | | | | | | | | | | | created from non-existent checkpoint directory ## What changes were proposed in this pull request? Take 2: avoid None.get NoSuchElementException in favor of more descriptive IllegalArgumentException if a non-existent checkpoint dir is used without a SparkContext ## How was this patch tested? Jenkins test plus new test for this particular case Author: Sean Owen <sowen@cloudera.com> Closes #12174 from srowen/SPARK-13211.
* [SPARK-13992] Add support for off-heap cachingJosh Rosen2016-04-011-1/+2
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This patch adds support for caching blocks in the executor processes using direct / off-heap memory. ## User-facing changes **Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` storage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if `serialized == true` and can be used to construct custom storage levels which support replication. **Storage UI reporting**: the storage UI will now report whether in-memory blocks are stored on- or off-heap. **Only supported by UnifiedMemoryManager**: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction. **Memory management policies:** the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap storage memory obeys `spark.memory.storageFraction` in order to control the amount of unevictable storage memory. For example, if `spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default `storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can split `spark.memory.storageFraction` into separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes. **Use of off-heap memory does not imply use of off-heap execution (or vice-versa)**: for now, the settings controlling the use of off-heap execution memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so that `spark.memory.offHeap.enabled` affect the default storage level for cached SQL tables. ## Internal changes - Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream` - It now returns a `ChunkedByteBuffer` instead of an array of byte arrays. - Its constructor now accept an `allocator` function which is called to allocate `ByteBuffer`s. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers. - Because block serialization is now performed during the unroll process, a `ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` allocator will use off-heap memory for both unroll and storage memory. - The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or off-heap. - `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa). - Make sure that off-heap buffers are properly de-allocated during MemoryStore eviction. - The JVM limits the total size of allocated direct byte buffers using the `-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11805 from JoshRosen/off-heap-caching.
* [SPARK-14075] Refactor MemoryStore to be testable independent of BlockManagerJosh Rosen2016-03-232-8/+12
| | | | | | | | | | | | | This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`. - The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`. - `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`. - The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests. - Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
* [SPARK-13990] Automatically pick serializer when caching RDDsJosh Rosen2016-03-211-3/+5
| | | | | | | | | | | | Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks. When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager. There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11801 from JoshRosen/pick-best-serializer-for-caching.
* [SPARK-14011][CORE][SQL] Enable `LineLength` Java checkstyle ruleDongjoon Hyun2016-03-212-38/+81
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? [Spark Coding Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) has 100-character limit on lines, but it's disabled for Java since 11/09/15. This PR enables **LineLength** checkstyle again. To help that, this also introduces **RedundantImport** and **RedundantModifier**, too. The following is the diff on `checkstyle.xml`. ```xml - <!-- TODO: 11/09/15 disabled - the lengths are currently > 100 in many places --> - <!-- <module name="LineLength"> <property name="max" value="100"/> <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/> </module> - --> <module name="NoLineWrap"/> <module name="EmptyBlock"> <property name="option" value="TEXT"/> -167,5 +164,7 </module> <module name="CommentsIndentation"/> <module name="UnusedImports"/> + <module name="RedundantImport"/> + <module name="RedundantModifier"/> ``` ## How was this patch tested? Currently, `lint-java` is disabled in Jenkins. It needs a manual test. After passing the Jenkins tests, `dev/lint-java` should passes locally. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11831 from dongjoon-hyun/SPARK-14011.
* [SPARK-13921] Store serialized blocks as multiple chunks in MemoryStoreJosh Rosen2016-03-172-2/+2
| | | | | | | | | | | | This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer. This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted. This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet). Author: Josh Rosen <joshrosen@databricks.com> Closes #11748 from JoshRosen/chunked-block-serialization.
* [SPARK-13928] Move org.apache.spark.Logging into ↵Wenchen Fan2016-03-178-5/+10
| | | | | | | | | | | | | | | | org.apache.spark.internal.Logging ## What changes were proposed in this pull request? Logging was made private in Spark 2.0. If we move it, then users would be able to create a Logging trait themselves to avoid changing their own code. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #11764 from cloud-fan/logger.
* [SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset ↵Sean Owen2016-03-162-2/+3
| | | | | | | | | | | | | | | | | | | | follow up ## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11725 from srowen/SPARK-13823.2.
* [MINOR][DOCS] Fix more typos in comments/strings.Dongjoon Hyun2016-03-146-8/+8
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR fixes 135 typos over 107 files: * 121 typos in comments * 11 typos in testcase name * 3 typos in log messages ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11689 from dongjoon-hyun/fix_more_typos.
* [SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> ↵Sean Owen2016-03-135-13/+16
| | | | | | | | | | | | | | | | | | | | byte[] conversions (and remaining Coverity items) ## What changes were proposed in this pull request? - Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8 - Same for `InputStreamReader` and `OutputStreamWriter` constructors - Standardizes on UTF-8 everywhere - Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`) - (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c ) ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11657 from srowen/SPARK-13823.
* [SPARK-3854][BUILD] Scala style: require spaces before `{`.Dongjoon Hyun2016-03-101-3/+3
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Since the opening curly brace, '{', has many usages as discussed in [SPARK-3854](https://issues.apache.org/jira/browse/SPARK-3854), this PR adds a ScalaStyle rule to prevent '){' pattern for the following majority pattern and fixes the code accordingly. If we enforce this in ScalaStyle from now, it will improve the Scala code quality and reduce review time. ``` // Correct: if (true) { println("Wow!") } // Incorrect: if (true){ println("Wow!") } ``` IntelliJ also shows new warnings based on this. ## How was this patch tested? Pass the Jenkins ScalaStyle test. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11637 from dongjoon-hyun/SPARK-3854.
* [SPARK-13696] Remove BlockStore class & simplify interfaces of mem. & disk ↵Josh Rosen2016-03-101-2/+8
| | | | | | | | | | | | | | stores Today, both the MemoryStore and DiskStore implement a common `BlockStore` API, but I feel that this API is inappropriate because it abstracts away important distinctions between the behavior of these two stores. For instance, the disk store doesn't have a notion of storing deserialized objects, so it's confusing for it to expose object-based APIs like putIterator() and getValues() instead of only exposing binary APIs and pushing the responsibilities of serialization and deserialization to the client. Similarly, the DiskStore put() methods accepted a `StorageLevel` parameter even though the disk store can only store blocks in one form. As part of a larger BlockManager interface cleanup, this patch remove the BlockStore interface and refines the MemoryStore and DiskStore interfaces to reflect more narrow sets of responsibilities for those components. Some of the benefits of this interface cleanup are reflected in simplifications to several unit tests to eliminate now-unnecessary mocking, significant simplification of the BlockManager's `getLocal()` and `doPut()` methods, and a narrower API between the MemoryStore and DiskStore. Author: Josh Rosen <joshrosen@databricks.com> Closes #11534 from JoshRosen/remove-blockstore-interface.
* [SPARK-7420][STREAMING][TESTS] Enable test: ↵proflin2016-03-091-2/+1
| | | | | | | | | | | | o.a.s.streaming.JobGeneratorSuite "Do not clear received… ## How was this patch tested? unit test Author: proflin <proflin.me@gmail.com> Closes #11626 from lw-lin/SPARK-7420.
* [SPARK-13693][STREAMING][TESTS] Stop StreamingContext before deleting ↵Shixiong Zhu2016-03-051-1/+1
| | | | | | | | | | | | | | | | | | checkpoint dir ## What changes were proposed in this pull request? Stop StreamingContext before deleting checkpoint dir to avoid the race condition that deleting the checkpoint dir and writing checkpoint happen at the same time. The flaky test log is here: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/256/testReport/junit/org.apache.spark.streaming/MapWithStateSuite/_It_is_not_a_test_/ ## How was this patch tested? unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11531 from zsxwing/SPARK-13693.
* [SPARK-13398][STREAMING] Move away from thread pool task support to forkjoinHolden Karau2016-03-041-3/+6
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Remove old deprecated ThreadPoolExecutor and replace with ExecutionContext using a ForkJoinPool. The downside of this is that scala's ForkJoinPool doesn't give us a way to specify the thread pool name (and is a wrapper of Java's in 2.12) except by providing a custom factory. Note that we can't use Java's ForkJoinPool directly in Scala 2.11 since it uses a ExecutionContext which reports system parallelism. One other implicit change that happens is the old ExecutionContext would have reported a different default parallelism since it used system parallelism rather than threadpool parallelism (this was likely not intended but also likely not a huge difference). The previous version of this PR attempted to use an execution context constructed on the ThreadPool (but not the deprecated ThreadPoolExecutor class) so as to keep the ability to have human readable named threads but this reported system parallelism. ## How was this patch tested? unit tests: streaming/testOnly org.apache.spark.streaming.util.* Author: Holden Karau <holden@us.ibm.com> Closes #11423 from holdenk/SPARK-13398-move-away-from-ThreadPoolTaskSupport-java-forkjoin.
* [SPARK-13583][CORE][STREAMING] Remove unused imports and add checkstyle ruleDongjoon Hyun2016-03-032-3/+1
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? After SPARK-6990, `dev/lint-java` keeps Java code healthy and helps PR review by saving much time. This issue aims remove unused imports from Java/Scala code and add `UnusedImports` checkstyle rule to help developers. ## How was this patch tested? ``` ./dev/lint-java ./build/sbt compile ``` Author: Dongjoon Hyun <dongjoon@apache.org> Closes #11438 from dongjoon-hyun/SPARK-13583.
* [SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.xSean Owen2016-03-034-10/+10
| | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen <sowen@cloudera.com> Closes #11292 from srowen/SPARK-13423.
* [SPARK-13399][STREAMING] Fix checkpointsuite type erasure warningsHolden Karau2016-02-221-6/+13
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Change the checkpointsuite getting the outputstreams to explicitly be unchecked on the generic type so as to avoid the warnings. This only impacts test code. Alternatively we could encode the type tag in the TestOutputStreamWithPartitions and filter the type tag as well - but this is unnecessary since multiple testoutputstreams are not registered and the previous code was not actually checking this type. ## How was the this patch tested? unit tests (streaming/testOnly org.apache.spark.streaming.CheckpointSuite) Author: Holden Karau <holden@us.ibm.com> Closes #11286 from holdenk/SPARK-13399-checkpointsuite-type-erasure.
* [SPARK-13186][STREAMING] migrate away from SynchronizedMapHuaxin Gao2016-02-222-4/+11
| | | | | | | | trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead. Author: Huaxin Gao <huaxing@us.ibm.com> Closes #11250 from huaxingao/spark__13186.
* [SPARK-13248][STREAMING] Remove deprecated Streaming APIs.Luciano Resende2016-02-211-7/+0
| | | | | | | | Remove deprecated Streaming APIs and adjust sample applications. Author: Luciano Resende <lresende@apache.org> Closes #11139 from lresende/streaming-deprecated-apis.
* [STREAMING][TEST] Fix flaky streaming.FailureSuiteTathagata Das2016-02-112-2/+6
| | | | | | | | | | Under some corner cases, the test suite failed to shutdown the SparkContext causing cascaded failures. This fix does two things - Makes sure no SparkContext is active after every test - Makes sure StreamingContext is always shutdown (prevents leaking of StreamingContexts as well, just in case) Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #11166 from tdas/fix-failuresuite.
* [SPARK-13170][STREAMING] Investigate replacing SynchronizedQueue as it is ↵Sean Owen2016-02-091-13/+24
| | | | | | | | | | deprecated Replace SynchronizeQueue with synchronized access to a Queue Author: Sean Owen <sowen@cloudera.com> Closes #11111 from srowen/SPARK-13170.
* [SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streamingHolden Karau2016-02-0911-157/+165
| | | | | | | | | | | | Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau <holden@us.ibm.com> Author: tedyu <yuzhihong@gmail.com> Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
* [SPARK-13195][STREAMING] Fix NoSuchElementException when a state is not set ↵Shixiong Zhu2016-02-041-0/+5
| | | | | | | | | | but timeoutThreshold is defined Check the state Existence before calling get. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11081 from zsxwing/SPARK-13195.
* [SPARK-12739][STREAMING] Details of batch in Streaming tab uses two Duration ↵Mario Briggs2016-02-031-2/+3
| | | | | | | | | | | columns I have clearly prefix the two 'Duration' columns in 'Details of Batch' Streaming tab as 'Output Op Duration' and 'Job Duration' Author: Mario Briggs <mario.briggs@in.ibm.com> Author: mariobriggs <mariobriggs@in.ibm.com> Closes #11022 from mariobriggs/spark-12739.