aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-04-18 16:10:40 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-04-18 16:10:40 -0700
commit74aa0df8f7f132b62754e5159262e4a5b9b641ab (patch)
tree39d2680669f8e00785c3fe00ec5966f0819e8754
parentf654b39a63d4f9b118733733c7ed2a1b58649e3d (diff)
downloadspark-74aa0df8f7f132b62754e5159262e4a5b9b641ab.tar.gz
spark-74aa0df8f7f132b62754e5159262e4a5b9b641ab.tar.bz2
spark-74aa0df8f7f132b62754e5159262e4a5b9b641ab.zip
[SPARK-20377][SS] Fix JavaStructuredSessionization example
## What changes were proposed in this pull request? Extra accessors in java bean class causes incorrect encoder generation, which corrupted the state when using timeouts. ## How was this patch tested? manually ran the example Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17676 from tdas/SPARK-20377.
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java9
1 files changed, 4 insertions, 5 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
index da3a5dfe86..d3c8516882 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
@@ -76,8 +76,6 @@ public final class JavaStructuredSessionization {
for (String word : lineWithTimestamp.getLine().split(" ")) {
eventList.add(new Event(word, lineWithTimestamp.getTimestamp()));
}
- System.out.println(
- "Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size());
return eventList.iterator();
}
};
@@ -100,7 +98,7 @@ public final class JavaStructuredSessionization {
// If timed out, then remove session and send final update
if (state.hasTimedOut()) {
SessionUpdate finalUpdate = new SessionUpdate(
- sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true);
+ sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true);
state.remove();
return finalUpdate;
@@ -133,7 +131,7 @@ public final class JavaStructuredSessionization {
// Set timeout such that the session will be expired if no data received for 10 seconds
state.setTimeoutDuration("10 seconds");
return new SessionUpdate(
- sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false);
+ sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false);
}
}
};
@@ -215,7 +213,8 @@ public final class JavaStructuredSessionization {
public long getEndTimestampMs() { return endTimestampMs; }
public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; }
- public long getDurationMs() { return endTimestampMs - startTimestampMs; }
+ public long calculateDuration() { return endTimestampMs - startTimestampMs; }
+
@Override public String toString() {
return "SessionInfo(numEvents = " + numEvents +
", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";