1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package test
import java.io.DataOutput
import java.io.DataInput
/** Interface for writing outputs from a DoFn. */
trait Emitter[A] {
def emit(value: A): Unit
}
/** A wrapper for a 'map' function tagged for a specific output channel. */
abstract class TaggedMapper[A, K, V]
(val tags: Set[Int])
(implicit val mA: Manifest[A], val wtA: WireFormat[A],
val mK: Manifest[K], val wtK: WireFormat[K], val ordK: Ordering[K],
val mV: Manifest[V], val wtV: WireFormat[V])
extends Serializable {
}
/** Type-class for sending types across the Hadoop wire. */
trait WireFormat[A]
class MapReduceJob {
trait DataSource
import scala.collection.mutable.{ Set => MSet, Map => MMap }
private val mappers: MMap[DataSource, MSet[TaggedMapper[_, _, _]]] = MMap.empty
def addTaggedMapper[A, K, V](input: DataSource, m: TaggedMapper[A, K, V]): Unit = {
if (!mappers.contains(input))
mappers += (input -> MSet(m))
else
mappers(input) += m // : Unit
m.tags.foreach { tag =>
}
}
}
|