aboutsummaryrefslogtreecommitdiff
path: root/python/tc.py
blob: 5dcc4317e0f1f7e6b55f7793cfbe58a13d031c7d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from rdd import SparkContext

sc = SparkContext("local", "PythonWordCount")
e = [(1, 2), (2, 3), (4, 1)]

tc = sc.parallelizePairs(e)

edges = tc.mapPairs(lambda (x, y): (y, x))

oldCount = 0
nextCount = tc.count()

def project(x):
    return (x[1][1], x[1][0])

while nextCount != oldCount:
    oldCount = nextCount
    tc = tc.union(tc.join(edges).mapPairs(project)).distinct()
    nextCount = tc.count()

print "TC has %i edges" % tc.count()
print tc.collect()