aboutsummaryrefslogtreecommitdiff
path: root/tests/untried/pos/t5862.scala
blob: e3006ddc3f7a7807cd3f09b0a2c90a62ec103609 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package test

import java.io.DataOutput
import java.io.DataInput

/** Interface for writing outputs from a DoFn. */
trait Emitter[A] {
  def emit(value: A): Unit
}

/** A wrapper for a 'map' function tagged for a specific output channel. */
abstract class TaggedMapper[A, K, V]
    (val tags: Set[Int])
    (implicit val mA: Manifest[A], val wtA: WireFormat[A],
              val mK: Manifest[K], val wtK: WireFormat[K], val ordK: Ordering[K],
              val mV: Manifest[V], val wtV: WireFormat[V])
  extends Serializable {
}

/** Type-class for sending types across the Hadoop wire. */
trait WireFormat[A]

class MapReduceJob {
  trait DataSource

  import scala.collection.mutable.{ Set => MSet, Map => MMap }
  private val mappers: MMap[DataSource, MSet[TaggedMapper[_, _, _]]] = MMap.empty

  def addTaggedMapper[A, K, V](input: DataSource, m: TaggedMapper[A, K, V]): Unit = {
    if (!mappers.contains(input))
      mappers += (input -> MSet(m))
    else
      mappers(input) += m // : Unit

    m.tags.foreach { tag =>
    }
  }
}