scala - Spark out of memory when reducing by key -


i'm working on algorithm requires math operations on large matrix. basically, algorithm involves following steps:

inputs: 2 vectors u , v of size n

  1. for each vector, compute pairwise euclidean distance between elements in vector. return 2 matrix e_u , e_v

  2. for each entry in 2 matrices, apply function f. return 2 matrix m_u, m_v

  3. find eigen values , eigen vectors of m_u. return e_i, ev_i = 0,...,n-1

  4. compute outer product each eigen vector. return matrix o_i = e_i*transpose(e_i), = 0,...,n-1

  5. adjust each eigen value e_i = e_i + delta_i, delta_i = sum elements(elementwise product of o_i , m_v)/2*mu, mu parameter

  6. final return matrix = elementwise sum (e_i * o_i) on = 0,...,n-1

the issue i'm facing memory when n large (15000 or more), since matrices here dense matrices. current way implement may not best, , partially worked.

i used rowmatrix m_u , eigen decomposition using svd.

the resulting u factor of svd row matrix columns ev_i's, have manually transpose rows become ev_i. resulting e vector eigen values e_i.

since previous attempt of directly mapping each row ev_i o_i failed due out of memory, i'm doing

r = u.map{     case(i,ev_i) => {       (i, ev_i.toarray.zipwithindex)     }   }//add index each element in vector   .flatmapvalues(x=>x)}   .join(u)//eigen vectors column appended   .map{case(eigenvecid, ((vecelement,elementid), eigenvec))=>(elementid, (eigenvecid, vecelement*eigenvec))} 

to compute adjusted e_i's in step 5 above, m_v stored rdd of tuples (i, densevector).

deltardd = r.join(m_v)   .map{     case(j,((i,row_j_of_o_i),row_j_of_m_v))=>     (i,row_j_of_o_i.t*densevector(row_j_of_m_v.toarray)/(2*mu))   }.reducebykey(_+_) 

finally, compute a, again due memory issue, have first joining rows different rdds , reducing key. specifically,

r_rearranged = r.map{case(j, (i, row_j_of_o_i))=>(i,(j,row_j_of_o_i))} termsfora = r_rearranged.join(deltardd) = termsfora.map{   case(i,(j,row_j_of_o_i), delta_i)) => (j, (delta_i + e(i))*row_j_of_o_i) } .reducebykey(_+_) 

the above implementation worked step of termsfora, means if execute action on termsfora termsfora.take(1).foreach(println), succeeded. if execute action on a, a.count(), oom error occured on driver.

i tried tune sparks configuration increase driver memory parallelism level, failed.

use indexedrowmatrix instead of rowmatrix, in conversions , transpose. suppose indexedrowmatrix irm

svd = irm.computesvd(k, true) u = svd.u u =  u.tocoordinatematrix().transpose().toindexedrowmatrix() 

you can convert irm blockmatrix multiplication distributed blockmatrix.


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -