aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
Commit message (Collapse)AuthorAgeFilesLines
...
* [SPARK-11791] Fix flaky test in BatchedWriteAheadLogSuiteBurak Yavuz2015-11-181-4/+8
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | stack trace of failure: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 62 times over 1.006322071 seconds. Last failure message: Argument(s) are different! Wanted: writeAheadLog.write( java.nio.HeapByteBuffer[pos=0 lim=124 cap=124], 10 ); -> at org.apache.spark.streaming.util.BatchedWriteAheadLogSuite$$anonfun$23$$anonfun$apply$mcV$sp$15.apply(WriteAheadLogSuite.scala:518) Actual invocation has different arguments: writeAheadLog.write( java.nio.HeapByteBuffer[pos=0 lim=124 cap=124], 10 ); -> at org.apache.spark.streaming.util.WriteAheadLogSuite$BlockingWriteAheadLog.write(WriteAheadLogSuite.scala:756) ``` I believe the issue was that due to a race condition, the ordering of the events could be messed up in the final ByteBuffer, therefore the comparison fails. By adding eventually between the requests, we make sure the ordering is preserved. Note that in real life situations, the ordering across threads will not matter. Another solution would be to implement a custom mockito matcher that sorts and then compares the results, but that kind of sounds like overkill to me. Let me know what you think tdas zsxwing Author: Burak Yavuz <brkyvz@gmail.com> Closes #9790 from brkyvz/fix-flaky-2.
* [SPARK-11814][STREAMING] Add better default checkpoint durationTathagata Das2015-11-181-1/+43
| | | | | | | | | DStream checkpoint interval is by default set at max(10 second, batch interval). That's bad for large batch intervals where the checkpoint interval = batch interval, and RDDs get checkpointed every batch. This PR is to set the checkpoint interval of trackStateByKey to 10 * batch duration. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9805 from tdas/SPARK-11814.
* [SPARK-11495] Fix potential socket / file handle leaks that were found via ↵Josh Rosen2015-11-181-7/+13
| | | | | | | | | | static analysis The HP Fortify Opens Source Review team (https://www.hpfod.com/open-source-review-project) reported a handful of potential resource leaks that were discovered using their static analysis tool. We should fix the issues identified by their scan. Author: Josh Rosen <joshrosen@databricks.com> Closes #9455 from JoshRosen/fix-potential-resource-leaks.
* [SPARK-4557][STREAMING] Spark Streaming foreachRDD Java API method should ↵Bryan Cutler2015-11-181-1/+40
| | | | | | | | | | accept a VoidFunction<...> Currently streaming foreachRDD Java API uses a function prototype requiring a return value of null. This PR deprecates the old method and uses VoidFunction to allow for more concise declaration. Also added VoidFunction2 to Java API in order to use in Streaming methods. Unit test is added for using foreachRDD with VoidFunction, and changes have been tested with Java 7 and Java 8 using lambdas. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #9488 from BryanCutler/foreachRDD-VoidFunction-SPARK-4557.
* [SPARK-11761] Prevent the call to StreamingContext#stop() in the listener ↵tedyu2015-11-171-0/+34
| | | | | | | | | | | | | | | | | bus's thread See discussion toward the tail of https://github.com/apache/spark/pull/9723 From zsxwing : ``` The user should not call stop or other long-time work in a listener since it will block the listener thread, and prevent from stopping SparkContext/StreamingContext. I cannot see an approach since we need to stop the listener bus's thread before stopping SparkContext/StreamingContext totally. ``` Proposed solution is to prevent the call to StreamingContext#stop() in the listener bus's thread. Author: tedyu <yuzhihong@gmail.com> Closes #9741 from tedyu/master.
* [SPARK-11740][STREAMING] Fix the race condition of two checkpoints in a batchShixiong Zhu2015-11-171-2/+25
| | | | | | | | We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9707 from zsxwing/fix-checkpoint.
* [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by defaultBurak Yavuz2015-11-164-6/+47
| | | | | | | | | | | | | Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period. I've also added some tests to make sure the default configurations are correct regarding recent additions: - batching on by default - closeFileAfterWrite off by default - parallelRecovery off by default Author: Burak Yavuz <brkyvz@gmail.com> Closes #9695 from brkyvz/enable-batch-wal.
* [SPARK-11573] Correct 'reflective access of structural type member meth…Gábor Lipták2015-11-141-0/+1
| | | | | | | | …od should be enabled' Scala warnings Author: Gábor Lipták <gliptak@gmail.com> Closes #9550 from gliptak/SPARK-11573.
* [SPARK-11681][STREAMING] Correctly update state timestamp even when state is ↵Tathagata Das2015-11-121-5/+131
| | | | | | | | | | | | | not updated Bug: Timestamp is not updated if there is data but the corresponding state is not updated. This is wrong, and timeout is defined as "no data for a while", not "not state update for a while". Fix: Update timestamp when timestamp when timeout is specified, otherwise no need. Also refactored the code for better testability and added unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9648 from tdas/SPARK-11681.
* [SPARK-11419][STREAMING] Parallel recovery for FileBasedWriteAheadLog + ↵Burak Yavuz2015-11-122-6/+172
| | | | | | | | | | | | | | minor recovery tweaks The support for closing WriteAheadLog files after writes was just merged in. Closing every file after a write is a very expensive operation as it creates many small files on S3. It's not necessary to enable it on HDFS anyway. However, when you have many small files on S3, recovery takes very long. In addition, files start stacking up pretty quickly, and deletes may not be able to keep up, therefore deletes can also be parallelized. This PR adds support for the two parallelization steps mentioned above, in addition to a couple more failures I encountered during recovery. Author: Burak Yavuz <brkyvz@gmail.com> Closes #9373 from brkyvz/par-recovery.
* [SPARK-11663][STREAMING] Add Java API for trackStateByKeyShixiong Zhu2015-11-121-0/+210
| | | | | | | | | | | TODO - [x] Add Java API - [x] Add API tests - [x] Add a function test Author: Shixiong Zhu <shixiong@databricks.com> Closes #9636 from zsxwing/java-track.
* [SPARK-11290][STREAMING][TEST-MAVEN] Fix the test for maven buildShixiong Zhu2015-11-121-3/+9
| | | | | | | | Should not create SparkContext in the constructor of `TrackStateRDDSuite`. This is a follow up PR for #9256 to fix the test for maven build. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9668 from zsxwing/hotfix.
* [SPARK-11639][STREAMING][FLAKY-TEST] Implement BlockingWriteAheadLog for ↵Burak Yavuz2015-11-111-47/+77
| | | | | | | | | | | | testing the BatchedWriteAheadLog Several elements could be drained if the main thread is not fast enough. zsxwing warned me about a similar problem, but missed it here :( Submitting the fix using a waiter. cc tdas Author: Burak Yavuz <brkyvz@gmail.com> Closes #9605 from brkyvz/fix-flaky-test.
* [SPARK-11290][STREAMING] Basic implementation of trackStateByKeyTathagata Das2015-11-103-0/+1001
| | | | | | | | | | | | | | | | | | | | | | | | | | | Current updateStateByKey provides stateful processing in Spark Streaming. It allows the user to maintain per-key state and manage that state using an updateFunction. The updateFunction is called for each key, and it uses new data and existing state of the key, to generate an updated state. However, based on community feedback, we have learnt the following lessons. * Need for more optimized state management that does not scan every key * Need to make it easier to implement common use cases - (a) timeout of idle data, (b) returning items other than state The high level idea that of this PR * Introduce a new API trackStateByKey that, allows the user to update per-key state, and emit arbitrary records. The new API is necessary as this will have significantly different semantics than the existing updateStateByKey API. This API will have direct support for timeouts. * Internally, the system will keep the state data as a map/list within the partitions of the state RDDs. The new data RDDs will be partitioned appropriately, and for all the key-value data, it will lookup the map/list in the state RDD partition and create a new list/map of updated state data. The new state RDD partition will be created based on the update data and if necessary, with old data. Here is the detailed design doc. Please take a look and provide feedback as comments. https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em This is still WIP. Major things left to be done. - [x] Implement basic functionality of state tracking, with initial RDD and timeouts - [x] Unit tests for state tracking - [x] Unit tests for initial RDD and timeout - [ ] Unit tests for TrackStateRDD - [x] state creating, updating, removing - [ ] emitting - [ ] checkpointing - [x] Misc unit tests for State, TrackStateSpec, etc. - [x] Update docs and experimental tags Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9256 from tdas/trackStateByKey.
* [SPARK-11361][STREAMING] Show scopes of RDD operations inside ↵Tathagata Das2015-11-102-14/+65
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | DStream.foreachRDD and DStream.transform in DAG viz Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same scope - `foreachRDD <time>`, as set by the `ForeachDStream`. So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI. This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to append their own scopes to the earlier DStream scope. I have also slightly tweaked how callsites are set such that the short callsite reflects the RDD operation name and line number. This tweak is necessary as callsites are not managed through scopes (which support nesting and overriding) and I didnt want to add another local property to control nesting and overriding of callsites. ## Before: ![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png) ## After: ![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png) The code that was used to generate this is: ``` val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _) val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _) val count = temp2.count println(count) } ``` Note - The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible - The short callsites of stages refers to the line number of the RDD ops rather than the same line number of foreachRDD in all three cases. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9315 from tdas/SPARK-11361.
* [SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UIShixiong Zhu2015-11-093-5/+12
| | | | | | | | | | | Expose executorId to `ReceiverInfo` and UI since it's helpful when there are multiple executors running in the same host. Screenshot: <img width="1058" alt="screen shot 2015-11-02 at 10 52 19 am" src="https://cloud.githubusercontent.com/assets/1000778/10890968/2e2f5512-8150-11e5-8d9d-746e826b69e8.png"> Author: Shixiong Zhu <shixiong@databricks.com> Author: zsxwing <zsxwing@gmail.com> Closes #9418 from zsxwing/SPARK-11333.
* [SPARK-11462][STREAMING] Add JavaStreamingListenerzsxwing2015-11-092-0/+375
| | | | | | | | | | | Currently, StreamingListener is not Java friendly because it exposes some Scala collections to Java users directly, such as Option, Map. This PR added a Java version of StreamingListener and a bunch of Java friendly classes for Java users. Author: zsxwing <zsxwing@gmail.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #9420 from zsxwing/java-streaming-listener.
* [SPARK-11141][STREAMING] Batch ReceivedBlockTrackerLogEvents for WAL writesBurak Yavuz2015-11-092-167/+461
| | | | | | | | | | When using S3 as a directory for WALs, the writes take too long. The driver gets very easily bottlenecked when multiple receivers send AddBlock events to the ReceiverTracker. This PR adds batching of events in the ReceivedBlockTracker so that receivers don't get blocked by the driver for too long. cc zsxwing tdas Author: Burak Yavuz <brkyvz@gmail.com> Closes #9143 from brkyvz/batch-wal-writes.
* [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not usedShixiong Zhu2015-11-061-0/+16
| | | | | | | | Just ignored `InputDStream`s that have null `rememberDuration` in `DStreamGraph.getMaxInputStreamRememberDuration`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9476 from zsxwing/SPARK-11511.
* [SPARK-11212][CORE][STREAMING] Make preferred locations support ↵zsxwing2015-10-272-42/+72
| | | | | | | | | | | | | | | | | | | ExecutorCacheTaskLocation and update… … ReceiverTracker and ReceiverSchedulingPolicy to use it This PR includes the following changes: 1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD. 2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host. The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver. So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this. Author: zsxwing <zsxwing@gmail.com> Closes #9181 from zsxwing/executor-location.
* [SPARK-11324][STREAMING] Flag for closing Write Ahead Logs after a writeBurak Yavuz2015-10-271-7/+25
| | | | | | | | | | | Currently the Write Ahead Log in Spark Streaming flushes data as writes need to be made. S3 does not support flushing of data, data is written once the stream is actually closed. In case of failure, the data for the last minute (default rolling interval) will not be properly written. Therefore we need a flag to close the stream after the write, so that we achieve read after write consistency. cc tdas zsxwing Author: Burak Yavuz <brkyvz@gmail.com> Closes #9285 from brkyvz/caw-wal.
* [SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load ↵maxwell2015-10-271-2/+33
| | | | | | | | | | | | | | | | | | | | array classes. When use Kafka DirectStream API to create checkpoint and restore saved checkpoint when restart, ClassNotFound exception would occur. The reason for this error is that ObjectInputStreamWithLoader extends the ObjectInputStream class and override its resolveClass method. But Instead of Using Class.forName(desc,false,loader), Spark uses loader.loadClass(desc) to instance the class, which do not works with array class. For example: Class.forName("[Lorg.apache.spark.streaming.kafka.OffsetRange.",false,loader) works well while loader.loadClass("[Lorg.apache.spark.streaming.kafka.OffsetRange") would throw an class not found exception. details of the difference between Class.forName and loader.loadClass can be found here. http://bugs.java.com/view_bug.do?bug_id=6446627 Author: maxwell <maxwellzdm@gmail.com> Author: DEMING ZHU <deming.zhu@linecorp.com> Closes #8955 from maxwellzdm/master.
* [SPARK-10984] Simplify *MemoryManager class structureJosh Rosen2015-10-251-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes: - MemoryManager - StaticMemoryManager - ExecutorMemoryManager - TaskMemoryManager - ShuffleMemoryManager This is fairly confusing. To simplify things, this patch consolidates several of these classes: - ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager. - TaskMemoryManager is moved into Spark Core. **Key changes and tasks**: - [x] Merge ExecutorMemoryManager into MemoryManager. - [x] Move pooling logic into Allocator. - [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`. - [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager. - [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager. - [x] Merge ShuffleMemoryManager into MemoryManager. - [x] Move code - [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.** - [x] Port ShuffleMemoryManagerSuite tests. - [x] Move classes from `unsafe` package to `memory` package. - [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction. - [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation - [x] AbstractBytesToBytesMapSuite - [x] UnsafeExternalSorterSuite - [x] UnsafeFixedWidthAggregationMapSuite - [x] UnsafeKVExternalSorterSuite **Compatiblity notes**: - This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task. Author: Josh Rosen <joshrosen@databricks.com> Closes #9127 from JoshRosen/SPARK-10984.
* [SPARK-11063] [STREAMING] Change preferredLocations of Receiver's RDD to ↵zsxwing2015-10-191-0/+24
| | | | | | | | | | | | hosts rather than hostports The format of RDD's preferredLocations must be hostname but the format of Streaming Receiver's scheduling executors is hostport. So it doesn't work. This PR converts `schedulerExecutors` to `hosts` before creating Receiver's RDD. Author: zsxwing <zsxwing@gmail.com> Closes #9075 from zsxwing/SPARK-11063.
* [SPARK-10974] [STREAMING] Add progress bar for output operation column and ↵zsxwing2015-10-163-20/+28
| | | | | | | | | | | | | | use red dots for failed batches Screenshot: <img width="1363" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10342571/385d9340-6d4c-11e5-8e79-1fa4c3c98f81.png"> Also fixed the description and duration for output operations that don't have spark jobs. <img width="1354" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10342775/4bd52a0e-6d4d-11e5-99bc-26265a9fc792.png"> Author: zsxwing <zsxwing@gmail.com> Closes #9010 from zsxwing/output-op-progress-bar.
* [SPARK-11060] [STREAMING] Fix some potential NPE in DStream transformationjerryshao2015-10-161-0/+66
| | | | | | | | | | | | | | This patch fixes: 1. Guard out against NPEs in `TransformedDStream` when parent DStream returns None instead of empty RDD. 2. Verify some input streams which will potentially return None. 3. Add unit test to verify the behavior when input stream returns None. cc tdas , please help to review, thanks a lot :). Author: jerryshao <sshao@hortonworks.com> Closes #9070 from jerryshao/SPARK-11060.
* [SPARK-10772] [STREAMING] [SCALA] NullPointerException when transform ↵Jacker Hu2015-10-101-0/+13
| | | | | | | | | | | | | | | function in DStream returns NULL Currently, the ```TransformedDStream``` will using ```Some(transformFunc(parentRDDs, validTime))``` as compute return value, when the ```transformFunc``` somehow returns null as return value, the followed operator will have NullPointerExeception. This fix uses the ```Option()``` instead of ```Some()``` to deal with the possible null value. When ```transformFunc``` returns ```null```, the option will transform null to ```None```, the downstream can handle ```None``` correctly. NOTE (2015-09-25): The latest fix will check the return value of transform function, if it is ```NULL```, a spark exception will be thrown out Author: Jacker Hu <gt.hu.chang@gmail.com> Author: jhu-chang <gt.hu.chang@gmail.com> Closes #8881 from jhu-chang/Fix_Transform.
* [SPARK-10956] Common MemoryManager interface for storage and executionAndrew Or2015-10-081-5/+8
| | | | | | | | | | | | This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible. This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks. Author: Andrew Or <andrew@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Author: andrewor14 <andrew@databricks.com> Closes #9000 from andrewor14/memory-manager.
* [SPARK-10885] [STREAMING] Display the failed output op in Streaming UIzsxwing2015-10-061-2/+2
| | | | | | | | | | | | | | | | | | | | This PR implements the following features for both `master` and `branch-1.5`. 1. Display the failed output op count in the batch list 2. Display the failure reason of output op in the batch detail page Screenshots: <img width="1356" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10198387/5b2b97ec-67ce-11e5-81c2-f818b9d2f3ad.png"> <img width="1356" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10198388/5b76ac14-67ce-11e5-8c8b-de2683c5b485.png"> There are still two remaining problems in the UI. 1. If an output operation doesn't run any spark job, we cannot get the its duration since now it's the sum of all jobs' durations. 2. If an output operation doesn't run any spark job, we cannot get the description since it's the latest job's call site. We need to add new `StreamingListenerEvent` about output operations to fix them. So I'd like to fix them only for `master` in another PR. Author: zsxwing <zsxwing@gmail.com> Closes #8950 from zsxwing/batch-failure.
* [SPARK-10900] [STREAMING] Add output operation events to StreamingListenerzsxwing2015-10-051-0/+37
| | | | | | | | | | | Add output operation events to StreamingListener so as to implement the following UI features: 1. Progress bar of a batch in the batch list. 2. Be able to display output operation `description` and `duration` when there is no spark job in a Streaming job. Author: zsxwing <zsxwing@gmail.com> Closes #8958 from zsxwing/output-operation-events.
* [SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming ↵zsxwing2015-09-231-0/+76
| | | | | | | | | | | UI to clear failed batches Slightly modified version of #8818, all credit goes to zsxwing Author: zsxwing <zsxwing@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8892 from tdas/SPARK-10692.
* [SPARK-10769] [STREAMING] [TESTS] Fix ↵zsxwing2015-09-231-1/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | o.a.s.streaming.CheckpointSuite.maintains rate controller Fixed the following failure in https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/ ``` sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 660 times over 10.000044392000001 seconds. Last failure message: 9223372036854775807 did not equal 200. at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413) at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396) at org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) ``` In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch jobs. However, one race condition is these two jobs can finish before the receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver and `getDefaultBlockGeneratorRateLimit` cannot be updated. Here are the logs related to this issue: ``` 15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock before advancing = 2500 15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming job 3000 ms.0 from job set of time 3000 ms 15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 1442975303.869 s for time 3000 ms (execution: 0.711 s) 15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming job 3500 ms.0 from job set of time 3500 ms 15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 1442975303.373 s for time 3500 ms (execution: 0.004 s) 15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO ReceiverTracker: Registered receiver for stream 0 from localhost:57749 15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO CheckpointSuite: Manual clock after advancing = 3500 ``` `advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the receiver was registered after job 3000ms and 3500ms finished. So we should make sure the receiver online before running `advanceTimeWithRealDelay(ssc, 2)`. Author: zsxwing <zsxwing@gmail.com> Closes #8877 from zsxwing/SPARK-10769.
* [SPARK-10224] [STREAMING] Fix the issue that blockIntervalTimer won't call ↵zsxwing2015-09-232-3/+87
| | | | | | | | | | | | | | | | | updateCurrentBuffer when stopping `blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost. To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`. I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds. There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called. Author: zsxwing <zsxwing@gmail.com> Closes #8417 from zsxwing/SPARK-10224.
* [SPARK-10652] [SPARK-10742] [STREAMING] Set meaningful job descriptions for ↵Tathagata Das2015-09-221-1/+1
| | | | | | | | | | | | | | | | | | | | | | all streaming jobs Here is the screenshot after adding the job descriptions to threads that run receivers and the scheduler thread running the batch jobs. ## All jobs page * Added job descriptions with links to relevant batch details page ![image](https://cloud.githubusercontent.com/assets/663212/9924165/cda4a372-5cb1-11e5-91ca-d43a32c699e9.png) ## All stages page * Added stage descriptions with links to relevant batch details page ![image](https://cloud.githubusercontent.com/assets/663212/9923814/2cce266a-5cae-11e5-8a3f-dad84d06c50e.png) ## Streaming batch details page * Added the +details link ![image](https://cloud.githubusercontent.com/assets/663212/9921977/24014a32-5c98-11e5-958e-457b6c38065b.png) Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8791 from tdas/SPARK-10652.
* [SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant job ↵Tathagata Das2015-09-211-0/+32
| | | | | | | | | | | | | | | | description in streaming jobs The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense. 1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop() 2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming. The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start(). Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8781 from tdas/SPARK-10649.
* [SPARK-10547] [TEST] Streamline / improve style of Java API testsSean Owen2015-09-122-425/+413
| | | | | | | | Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order Author: Sean Owen <sowen@cloudera.com> Closes #8706 from srowen/SPARK-10547.
* [SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream ↵zsxwing2015-09-081-9/+19
| | | | | | | | | | | | and throw a better exception when reading QueueInputDStream Output a warning when serializing QueueInputDStream rather than throwing an exception to allow unit tests use it. Moreover, this PR also throws an better exception when deserializing QueueInputDStream to make the user find out the problem easily. The previous exception is hard to understand: https://issues.apache.org/jira/browse/SPARK-8553 Author: zsxwing <zsxwing@gmail.com> Closes #8624 from zsxwing/SPARK-10071 and squashes the following commits: 847cfa8 [zsxwing] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
* [SPARK-9767] Remove ConnectionManager.Reynold Xin2015-09-071-4/+6
| | | | | | | | We introduced the Netty network module for shuffle in Spark 1.2, and has turned it on by default for 3 releases. The old ConnectionManager is difficult to maintain. If we merge the patch now, by the time it is released, it would be 1 yr for which ConnectionManager is off by default. It's time to remove it. Author: Reynold Xin <rxin@databricks.com> Closes #8161 from rxin/SPARK-9767.
* [SPARK-9869] [STREAMING] Wait for all event notifications before asserting ↵robbins2015-09-031-0/+3
| | | | | | | | results Author: robbins <robbins@uk.ibm.com> Closes #8589 from robbinspg/InputStreamSuite-fix.
* [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when ↵zsxwing2015-08-311-0/+51
| | | | | | | | | | deregisterReceivering since we may reuse it later `deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it. Author: zsxwing <zsxwing@gmail.com> Closes #8538 from zsxwing/SPARK-10369.
* [SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses ↵Sean Owen2015-08-252-19/+9
| | | | | | | | | | | | to JavaConverters Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen <sowen@cloudera.com> Closes #8033 from srowen/SPARK-9613.
* [SPARK-10210] [STREAMING] Filter out non-existent blocks before creating ↵Tathagata Das2015-08-251-0/+156
| | | | | | | | | | | | | | BlockRDD When write ahead log is not enabled, a recovered streaming driver still tries to run jobs using pre-failure block ids, and fails as the block do not exists in-memory any more (and cannot be recovered as receiver WAL is not enabled). This occurs because the driver-side WAL of ReceivedBlockTracker is recovers that past block information, and ReceiveInputDStream creates BlockRDDs even if those blocks do not exist. The solution in this PR is to filter out block ids that do not exist before creating the BlockRDD. In addition, it adds unit tests to verify other logic in ReceiverInputDStream. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8405 from tdas/SPARK-10210.
* [SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers ↵zsxwing2015-08-241-6/+7
| | | | | | | | | | | | | | | | | | | returns balanced results This PR fixes the following cases for `ReceiverSchedulingPolicy`. 1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1). Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested, and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested. This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`. 2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle. This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors. Author: zsxwing <zsxwing@gmail.com> Closes #8340 from zsxwing/fix-receiver-scheduling.
* [SPARK-10098] [STREAMING] [TEST] Cleanup active context after test in ↵Tathagata Das2015-08-181-10/+17
| | | | | | | | | | FailureSuite Failures in streaming.FailureSuite can leak StreamingContext and SparkContext which fails all subsequent tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8289 from tdas/SPARK-10098.
* [SPARK-9966] [STREAMING] Handle couple of corner cases in PIDRateEstimatorTathagata Das2015-08-141-28/+51
| | | | | | | | | | | | | 1. The rate estimator should not estimate any rate when there are no records in the batch, as there is no data to estimate the rate. In the current state, it estimates and set the rate to zero. That is incorrect. 2. The rate estimator should not never set the rate to zero under any circumstances. Otherwise the system will stop receiving data, and stop generating useful estimates (see reason 1). So the fix is to define a parameters that sets a lower bound on the estimated rate, so that the system always receives some data. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8199 from tdas/SPARK-9966 and squashes the following commits: 829f793 [Tathagata Das] Fixed unit test and added comments 3a994db [Tathagata Das] Added min rate and updated tests in PIDRateEstimator
* Disable JobGeneratorSuite "Do not clear received block data too soon".Reynold Xin2015-08-091-1/+2
|
* [SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more ↵Tathagata Das2015-08-065-138/+355
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | robust and make all BlockGenerators subscribe to rate limit updates In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied. In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular. - Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`) - Made `RateControllerSuite` faster (by increasing batch interval) and less flaky - Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places). - Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7913 from tdas/SPARK-9556 and squashes the following commits: 41d4461 [Tathagata Das] fix scala style eb9fd59 [Tathagata Das] Updated kinesis receiver d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556 31da173 [Tathagata Das] Fix bug 12116df [Tathagata Das] Add BlockGeneratorSuite 74bd069 [Tathagata Das] Fix style 989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it 3ff618c [Tathagata Das] Fix test b40eff8 [Tathagata Das] slight refactoring f0df0f1 [Tathagata Das] Scala style fixes 51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator
* [SPARK-9504] [STREAMING] [TESTS] Fix ↵zsxwing2015-08-041-1/+2
| | | | | | | | | | | | | | o.a.s.streaming.StreamingContextSuite.stop gracefully again The test failure is here: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3150/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/ There is a race condition in TestReceiver that it may add 1 record and increase `TestReceiver.counter` after stopping `BlockGenerator`. This PR just adds `join` to wait the pushing thread. Author: zsxwing <zsxwing@gmail.com> Closes #7934 from zsxwing/SPARK-9504-2 and squashes the following commits: cfd7973 [zsxwing] Wait for the thread to make sure we won't change TestReceiver.counter after stopping BlockGenerator
* [SPARK-9534] [BUILD] Enable javac lint for scalac parity; fix a lot of build ↵Sean Owen2015-08-042-32/+34
| | | | | | | | | | | | | | warnings, 1.5.0 edition Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process. I'll explain several of the changes inline in comments. Author: Sean Owen <sowen@cloudera.com> Closes #7862 from srowen/SPARK-9534 and squashes the following commits: ea51618 [Sean Owen] Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process.
* [SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky testzsxwing2015-07-311-5/+5
| | | | | | | | | | | | | The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a super overloaded Jenkins worker, the receiver may be not able to start in 500 milliseconds. Verified this in the log of https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There is no log about starting the receiver before this failure. That's why `assert(runningCount > 0)` failed. This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be more reliable. Author: zsxwing <zsxwing@gmail.com> Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits: 7af66a6 [zsxwing] Remove wrong assertion 5ba2c99 [zsxwing] Use eventually to fix the flaky test