From 988c71457354b0a443471f501cef544a85b1a76a Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 23 Sep 2016 12:17:59 -0700 Subject: [SPARK-17643] Remove comparable requirement from Offset For some sources, it is difficult to provide a global ordering based only on the data in the offset. Since we don't use comparison for correctness, lets remove it. Author: Michael Armbrust Closes #15207 from marmbrus/removeComparable. --- .../sql/execution/streaming/CompositeOffset.scala | 30 ----------------- .../spark/sql/execution/streaming/LongOffset.scala | 6 ---- .../spark/sql/execution/streaming/Offset.scala | 19 +++-------- .../sql/execution/streaming/StreamExecution.scala | 9 ++--- .../apache/spark/sql/streaming/OffsetSuite.scala | 39 ---------------------- 5 files changed, 9 insertions(+), 94 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 729c8462fe..ebc6ee8184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -23,36 +23,6 @@ package org.apache.spark.sql.execution.streaming * vector clock that must progress linearly forward. */ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { - /** - * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, - * or greater than the specified object. - */ - override def compareTo(other: Offset): Int = other match { - case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size => - val comparisons = offsets.zip(otherComposite.offsets).map { - case (Some(a), Some(b)) => a compareTo b - case (None, None) => 0 - case (None, _) => -1 - case (_, None) => 1 - } - val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet - nonZeroSigns.size match { - case 0 => 0 // if both empty or only 0s - case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) - case _ => // there are both 1s and -1s - throw new IllegalArgumentException( - s"Invalid comparison between non-linear histories: $this <=> $other") - } - case _ => - throw new IllegalArgumentException(s"Cannot compare $this <=> $other") - } - - private def sign(num: Int): Int = num match { - case i if i < 0 => -1 - case i if i == 0 => 0 - case i if i > 0 => 1 - } - /** * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of * sources. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index bb176408d8..c5e8827777 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -22,12 +22,6 @@ package org.apache.spark.sql.execution.streaming */ case class LongOffset(offset: Long) extends Offset { - override def compareTo(other: Offset): Int = other match { - case l: LongOffset => offset.compareTo(l.offset) - case _ => - throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}") - } - def +(increment: Long): LongOffset = new LongOffset(offset + increment) def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index 2cc012840d..1f52abf277 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -19,19 +19,8 @@ package org.apache.spark.sql.execution.streaming /** * An offset is a monotonically increasing metric used to track progress in the computation of a - * stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent - * with `equals` and `hashcode`. + * stream. Since offsets are retrieved from a [[Source]] by a single thread, we know the global + * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no + * new data has arrived. */ -trait Offset extends Serializable { - - /** - * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, - * or greater than the specified object. - */ - def compareTo(other: Offset): Int - - def >(other: Offset): Boolean = compareTo(other) > 0 - def <(other: Offset): Boolean = compareTo(other) < 0 - def <=(other: Offset): Boolean = compareTo(other) <= 0 - def >=(other: Offset): Boolean = compareTo(other) >= 0 -} +trait Offset extends Serializable {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 220f77dc24..9825f19b86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -259,7 +259,7 @@ class StreamExecution( case (source, available) => committedOffsets .get(source) - .map(committed => committed < available) + .map(committed => committed != available) .getOrElse(true) } } @@ -318,7 +318,8 @@ class StreamExecution( // Request unprocessed data from all sources. val newData = availableOffsets.flatMap { - case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) => + case (source, available) + if committedOffsets.get(source).map(_ != available).getOrElse(true) => val current = committedOffsets.get(source) val batch = source.getBatch(current, available) logDebug(s"Retrieving data from $source: $current -> $available") @@ -404,10 +405,10 @@ class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is indented for use primarily when writing tests. */ - def awaitOffset(source: Source, newOffset: Offset): Unit = { + private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset + !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset } while (notDone) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala index 9590af4e77..b65a987770 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala @@ -24,44 +24,12 @@ trait OffsetSuite extends SparkFunSuite { /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */ def compare(one: Offset, two: Offset): Unit = { test(s"comparison $one <=> $two") { - assert(one < two) - assert(one <= two) - assert(one <= one) - assert(two > one) - assert(two >= one) - assert(one >= one) assert(one == one) assert(two == two) assert(one != two) assert(two != one) } } - - /** Creates test to check that non-equality comparisons throw exception. */ - def compareInvalid(one: Offset, two: Offset): Unit = { - test(s"invalid comparison $one <=> $two") { - intercept[IllegalArgumentException] { - assert(one < two) - } - - intercept[IllegalArgumentException] { - assert(one <= two) - } - - intercept[IllegalArgumentException] { - assert(one > two) - } - - intercept[IllegalArgumentException] { - assert(one >= two) - } - - assert(!(one == two)) - assert(!(two == one)) - assert(one != two) - assert(two != one) - } - } } class LongOffsetSuite extends OffsetSuite { @@ -79,10 +47,6 @@ class CompositeOffsetSuite extends OffsetSuite { one = CompositeOffset(None :: Nil), two = CompositeOffset(Some(LongOffset(2)) :: Nil)) - compareInvalid( // sizes must be same - one = CompositeOffset(Nil), - two = CompositeOffset(Some(LongOffset(2)) :: Nil)) - compare( one = CompositeOffset.fill(LongOffset(0), LongOffset(1)), two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) @@ -91,8 +55,5 @@ class CompositeOffsetSuite extends OffsetSuite { one = CompositeOffset.fill(LongOffset(1), LongOffset(1)), two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) - compareInvalid( - one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), // vector time inconsistent - two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) } -- cgit v1.2.3