apache spark - scala: Remove columns where column value below median value for all columns -


in data reduction phase of analysis, want remove columns column total below median value of column totals. dataset:

v1,v2,v3 1  3  5 3  4  3 

i sum columns

v1,v2,v3 4  7  8 

the median 7 drop v1

v2,v3 3  5 4  3 

i thought streaming function on row. not seem possible.

the code have come works, seems verbose , looks lot java code (which take sign doing wrong).

are there more efficient ways of performing operation?

  val val dfv2=dataframeutils.openfile(spark,"c:\\users\\jake\\__workspace\\r\\datafiles\\ikodadatamanipulation\\verb2.csv")    //return single row dataframe sum of each column   val dfv2summed:dataframe=dfv2.groupby().sum()    logger.info(s"dfv2summed col count ${dfv2summed.schema.fieldnames.length}")     //get rowvalues   val rowvalues:array[long]=dfv2summed.head().getvaluesmap(dfv2summed.schema.fieldnames).values.toarray    //sort rows   scala.util.sorting.quicksort(rowvalues)    //calculate medians (simplistically)   val median:long = rowvalues(rowvalues.length/2)    //arraybuffer hold column needs need removing   var columnarray: arraybuffer[string] = arraybuffer[string]()    //get tuple key value pairs of columnname/value   val entries: map[string, long]=dfv2summed.head().getvaluesmap(dfv2summed.schema.fieldnames)   entries.foreach   {      //find columns total value below median value     kv =>        if(kv._2.<(median))         {           columnarray+=kv._1         }   }    //drop columns   val dropcolumns:seq[string]=columnarray.map(s => s.substring(s.indexof("sum(")+4,s.length-1)).toseq   logger.info(s"todrop ${dropcolumns.size} : ${dropcolumns}")   val reduceddf=dfv2.drop(dropcolumns: _*)   logger.info(s"reduceddf col count  ${reduceddf.schema.fieldnames.length}") 

after calculating sum of each column in spark, can median value in plain scala , select columns greater or equal value column indices.

let's start defining function calculating median, slight modification of this example:

def median(seq: seq[long]): long = {   //in order if not sure 'seq' sorted   val sortedseq = seq.sortwith(_ < _)    if (seq.size % 2 == 1) sortedseq(sortedseq.size / 2)   else {     val (up, down) = sortedseq.splitat(seq.size / 2)     (up.last + down.head) / 2   } } 

we first calculate sums columns , convert seq[long]:

import org.apache.spark.sql.functions._  val sums = df.select(df.columns.map(c => sum(col(c)).alias(c)): _*)              .first.toseq.asinstanceof[seq[long]] 

then calculate median,

val med = median(sums) 

and use threshold generate column indices keep:

val cols_keep = sums.zipwithindex.filter(_._1 >= med).map(_._2) 

finally, map these indices inside select() statement:

df.select(cols_keep map df.columns map col: _*).show() +---+---+ | v2| v3| +---+---+ |  3|  5| |  4|  3| +---+---+ 

Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -