scala - Unexpected spark caching behavior -


i've got spark program this:

def foo(a: rdd[...], b: rdd[...]) = {   val c = a.map(...)   c.persist(storagelevel.memory_only_ser)   var current = b   (_ <- 1 10) {     val next = some_other_rdd_ops(c, current)     next.persist(storagelevel.memory_only)     current.unpersist()     current = next   }   current.saveastextfile(...) } 

the strange behavior i'm seeing spark stages corresponding val c = a.map(...) happening 10 times. have expected happen once because of immediate caching on next line, that's not case. when in "storage" tab of running job, few of partitions of c cached.

also, 10 copies of stage show "active". 10 copies of stage corresponding val next = some_other_rdd_ops(c, current) show pending, , alternate execution.

am misunderstanding how spark cache rdds?

edit: here gist containing program reproduce this: https://gist.github.com/jfkelley/f407c7750a086cdb059c. expects input edge list of graph (with edge weights). example:

a   b   1000.0   c   1000.0 b   c   1000.0 d   e   1000.0 d   f   1000.0 e   f   1000.0 g   h   1000.0 h     1000.0 g     1000.0 d   g   400.0 

lines 31-42 of gist correspond simplified version above. 10 stages corresponding line 31 when expect 1.

caching doesn't reduce stages, won't recompute stage every time.

in first iteration, in stage's "input size" can see data coming hadoop, , reads shuffle input. in subsequent iterations, data coming memory , no more shuffle input. also, execution time vastly reduced.

new map stages created whenever shuffles have written, example when there's change in partitioning, in case adding key rdd.


Comments

Popular posts from this blog

c# - Validate object ID from GET to POST -

node.js - Custom Model Validator SailsJS -

php - Find a regex to take part of Email -