1 |
# print <span class="wp_keywordlink_affiliate"><a href="http://www./archives/tag/spark" title="" target="_blank" data-original-title="View all posts in Spark">Spark</a></span> version |
2 |
print ( "pyspark version:" + str (sc.version)) |
map
02 |
# sc = spark context, parallelize creates an RDD from the passed object |
03 |
x = sc.parallelize([ 1 , 2 , 3 ]) |
04 |
y = x. map ( lambda x: (x,x * * 2 )) |
06 |
# collect copies RDD elements to a list on the driver |
11 |
[( 1 , 1 ), ( 2 , 4 ), ( 3 , 9 )] |
flatMap
2 |
x = sc.parallelize([ 1 , 2 , 3 ]) |
3 |
y = x.flatMap( lambda x: (x, 100 * x, x * * 2 )) |
8 |
[ 1 , 100 , 1 , 2 , 200 , 4 , 3 , 300 , 9 ] |
mapPartitions
02 |
x = sc.parallelize([ 1 , 2 , 3 ], 2 ) |
03 |
def f(iterator): yield sum (iterator) |
04 |
y = x.mapPartitions(f) |
05 |
# glom() flattens elements on the same partition |
06 |
print (x.glom().collect()) |
07 |
print (y.glom().collect()) |
mapPartitionsWithIndex
01 |
# mapPartitionsWithIndex |
02 |
x = sc.parallelize([ 1 , 2 , 3 ], 2 ) |
03 |
def f(partitionIndex, iterator): yield (partitionIndex, sum (iterator)) |
04 |
y = x.mapPartitionsWithIndex(f) |
06 |
# glom() flattens elements on the same partition |
07 |
print (x.glom().collect()) |
08 |
print (y.glom().collect()) |
getNumPartitions
2 |
x = sc.parallelize([ 1 , 2 , 3 ], 2 ) |
3 |
y = x.getNumPartitions() |
4 |
print (x.glom().collect()) |
filter
2 |
x = sc.parallelize([ 1 , 2 , 3 ]) |
3 |
y = x. filter ( lambda x: x % 2 = = 1 ) # filters out even elements |
distinct
2 |
x = sc.parallelize([ 'A' , 'A' , 'B' ]) |
sample
02 |
x = sc.parallelize( range ( 7 )) |
03 |
# call 'sample' 5 times |
04 |
ylist = [x.sample(withReplacement = False , fraction = 0.5 ) for i in range ( 5 )] |
05 |
print ( 'x = ' + str (x.collect())) |
06 |
for cnt,y in zip ( range ( len (ylist)), ylist): |
07 |
print ( 'sample:' + str (cnt) + ' y = ' + str (y.collect())) |
09 |
x = [ 0 , 1 , 2 , 3 , 4 , 5 , 6 ] |
10 |
sample: 0 y = [ 0 , 2 , 5 , 6 ] |
12 |
sample: 2 y = [ 0 , 4 , 5 , 6 ] |
13 |
sample: 3 y = [ 0 , 2 , 6 ] |
14 |
sample: 4 y = [ 0 , 3 , 4 ] |
takeSample
02 |
x = sc.parallelize( range ( 7 )) |
03 |
# call 'sample' 5 times |
04 |
ylist = [x.takeSample(withReplacement = False , num = 3 ) for i in range ( 5 )] |
05 |
print ( 'x = ' + str (x.collect())) |
06 |
for cnt,y in zip ( range ( len (ylist)), ylist): |
07 |
print ( 'sample:' + str (cnt) + ' y = ' + str (y)) # no collect on y |
09 |
x = [ 0 , 1 , 2 , 3 , 4 , 5 , 6 ] |
10 |
sample: 0 y = [ 0 , 2 , 6 ] |
11 |
sample: 1 y = [ 6 , 4 , 2 ] |
12 |
sample: 2 y = [ 2 , 0 , 4 ] |
13 |
sample: 3 y = [ 5 , 4 , 1 ] |
14 |
sample: 4 y = [ 3 , 1 , 4 ] |
union
02 |
x = sc.parallelize([ 'A' , 'A' , 'B' ]) |
03 |
y = sc.parallelize([ 'D' , 'C' , 'A' ]) |
11 |
[ 'A' , 'A' , 'B' , 'D' , 'C' , 'A' ] |
intersection
02 |
x = sc.parallelize([ 'A' , 'A' , 'B' ]) |
03 |
y = sc.parallelize([ 'A' , 'C' , 'D' ]) |
sortByKey
2 |
x = sc.parallelize([( 'B' , 1 ),( 'A' , 2 ),( 'C' , 3 )]) |
7 |
[( 'B' , 1 ), ( 'A' , 2 ), ( 'C' , 3 )] |
8 |
[( 'A' , 2 ), ( 'B' , 1 ), ( 'C' , 3 )] |
|