1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
package scala.collection.parallel.benchmarks.parallel_array
import scala.collection.parallel.benchmarks._
import scala.collection.parallel.mutable.ParArray
import extra166y.{ParallelArray => JSR166Array}
class Cont(val in: Int) {
var num = in
override def toString = in.toString
}
object Cont {
val pred = (a: Cont) => a.in > 100
val predjsr = new extra166y.Ops.Predicate[Cont] {
def op(a: Cont) = a.in > 100
}
val op = (a: Cont, b: Cont) => {
b.num = a.in + b.in
b
}
val opnew = (a: Cont, b: Cont) => new Cont(a.in + b.in)
val opheavy = (a: Cont, b: Cont) => {
heavyComputation(a, b)
}
val reducer = new extra166y.Ops.Reducer[Cont] {
def op(a: Cont, b: Cont) = {
b.num = a.in + b.in
b
}
}
val reducernew = new extra166y.Ops.Reducer[Cont] {
def op(a: Cont, b: Cont) = new Cont(a.in + b.in)
}
val reducerheavy = new extra166y.Ops.Reducer[Cont] {
def op(a: Cont, b: Cont) = heavyComputation(a, b)
}
def heavyComputation(a: Cont, b: Cont) = {
val f = a.in
val s = b.in
var i = 0
var res = f * s
while (i < 50000) {
if ((i + f) % 3 == 0) res += s
else res -= f
i += 1
}
b.num = res
b
}
}
abstract class Resettable[T](val size: Int, val parallelism: Int, val runWhat: String,
elemcreator: Int => T, arrcreator: Int => Array[Any], cls: Class[T])
extends Bench with SequentialOps[T] {
val forkjoinpool = new scala.concurrent.forkjoin.ForkJoinPool(parallelism)
forkjoinpool.setMaximumPoolSize(parallelism)
val papool = new jsr166y.ForkJoinPool(parallelism)
papool.setMaximumPoolSize(parallelism)
var pa: ParArray[T] = null
var jsrarr: JSR166Array[T] = null
reset
def reset = runWhat match {
case "seq" =>
arr = arrcreator(size)
for (i <- 0 until size) arr(i) = elemcreator(i)
case "par" =>
pa = new ParArray[T](size)
collection.parallel.tasksupport.environment = forkjoinpool
for (i <- 0 until size) pa(i) = elemcreator(i)
case "jsr" =>
jsrarr = JSR166Array.create(size, cls, papool)
for (i <- 0 until size) jsrarr.set(i, elemcreator(i))
case _ => throw new IllegalArgumentException("Unknown type: " + runWhat)
}
var updateCounter = 0
def incUpdateCounter {
updateCounter += 1
if (updateCounter > size) updateCounter = 0
}
def updateSeq {
val tmp = arr(updateCounter)
arr(updateCounter) = arr(size - updateCounter - 1)
arr(size - updateCounter - 1) = tmp
incUpdateCounter
}
def updatePar {
val tmp = pa(updateCounter)
pa(updateCounter) = pa(size - updateCounter - 1)
pa(size - updateCounter - 1) = tmp
incUpdateCounter
}
def updateJsr {
val tmp = jsrarr.get(updateCounter)
jsrarr.set(updateCounter, jsrarr.get(size - updateCounter - 1))
jsrarr.set(size - updateCounter - 1, tmp)
incUpdateCounter
}
override def printResults {
println(" --- Fork join pool state --- ")
println("Parallelism: " + forkjoinpool.getParallelism)
println("Active threads: " + forkjoinpool.getActiveThreadCount)
println("Work stealings: " + forkjoinpool.getStealCount)
}
}
|