scala - Spark out of memory when reducing by key -
i'm working on algorithm requires math operations on large matrix. basically, algorithm involves following steps:
inputs: 2 vectors u , v of size n
for each vector, compute pairwise euclidean distance between elements in vector. return 2 matrix e_u , e_v
for each entry in 2 matrices, apply function f. return 2 matrix m_u, m_v
find eigen values , eigen vectors of m_u. return e_i, ev_i = 0,...,n-1
compute outer product each eigen vector. return matrix o_i = e_i*transpose(e_i), = 0,...,n-1
adjust each eigen value e_i = e_i + delta_i, delta_i = sum elements(elementwise product of o_i , m_v)/2*mu, mu parameter
final return matrix = elementwise sum (e_i * o_i) on = 0,...,n-1
the issue i'm facing memory when n large (15000 or more), since matrices here dense matrices. current way implement may not best, , partially worked.
i used rowmatrix m_u , eigen decomposition using svd.
the resulting u factor of svd row matrix columns ev_i's, have manually transpose rows become ev_i. resulting e vector eigen values e_i.
since previous attempt of directly mapping each row ev_i o_i failed due out of memory, i'm doing
r = u.map{ case(i,ev_i) => { (i, ev_i.toarray.zipwithindex) } }//add index each element in vector .flatmapvalues(x=>x)} .join(u)//eigen vectors column appended .map{case(eigenvecid, ((vecelement,elementid), eigenvec))=>(elementid, (eigenvecid, vecelement*eigenvec))}
to compute adjusted e_i's in step 5 above, m_v stored rdd of tuples (i, densevector).
deltardd = r.join(m_v) .map{ case(j,((i,row_j_of_o_i),row_j_of_m_v))=> (i,row_j_of_o_i.t*densevector(row_j_of_m_v.toarray)/(2*mu)) }.reducebykey(_+_)
finally, compute a, again due memory issue, have first joining rows different rdds , reducing key. specifically,
r_rearranged = r.map{case(j, (i, row_j_of_o_i))=>(i,(j,row_j_of_o_i))} termsfora = r_rearranged.join(deltardd) = termsfora.map{ case(i,(j,row_j_of_o_i), delta_i)) => (j, (delta_i + e(i))*row_j_of_o_i) } .reducebykey(_+_)
the above implementation worked step of termsfora, means if execute action on termsfora termsfora.take(1).foreach(println), succeeded. if execute action on a, a.count(), oom error occured on driver.
i tried tune sparks configuration increase driver memory parallelism level, failed.
use indexedrowmatrix instead of rowmatrix, in conversions , transpose. suppose indexedrowmatrix irm
svd = irm.computesvd(k, true) u = svd.u u = u.tocoordinatematrix().transpose().toindexedrowmatrix()
you can convert irm blockmatrix multiplication distributed blockmatrix.
Comments
Post a Comment