apache spark - scala: Remove columns where column value below median value for all columns -
in data reduction phase of analysis, want remove columns column total below median value of column totals. dataset:
v1,v2,v3 1 3 5 3 4 3
i sum columns
v1,v2,v3 4 7 8
the median 7 drop v1
v2,v3 3 5 4 3
i thought streaming function on row. not seem possible.
the code have come works, seems verbose , looks lot java code (which take sign doing wrong).
are there more efficient ways of performing operation?
val val dfv2=dataframeutils.openfile(spark,"c:\\users\\jake\\__workspace\\r\\datafiles\\ikodadatamanipulation\\verb2.csv") //return single row dataframe sum of each column val dfv2summed:dataframe=dfv2.groupby().sum() logger.info(s"dfv2summed col count ${dfv2summed.schema.fieldnames.length}") //get rowvalues val rowvalues:array[long]=dfv2summed.head().getvaluesmap(dfv2summed.schema.fieldnames).values.toarray //sort rows scala.util.sorting.quicksort(rowvalues) //calculate medians (simplistically) val median:long = rowvalues(rowvalues.length/2) //arraybuffer hold column needs need removing var columnarray: arraybuffer[string] = arraybuffer[string]() //get tuple key value pairs of columnname/value val entries: map[string, long]=dfv2summed.head().getvaluesmap(dfv2summed.schema.fieldnames) entries.foreach { //find columns total value below median value kv => if(kv._2.<(median)) { columnarray+=kv._1 } } //drop columns val dropcolumns:seq[string]=columnarray.map(s => s.substring(s.indexof("sum(")+4,s.length-1)).toseq logger.info(s"todrop ${dropcolumns.size} : ${dropcolumns}") val reduceddf=dfv2.drop(dropcolumns: _*) logger.info(s"reduceddf col count ${reduceddf.schema.fieldnames.length}")
after calculating sum of each column in spark
, can median value in plain scala
, select columns greater or equal value column indices.
let's start defining function calculating median, slight modification of this example:
def median(seq: seq[long]): long = { //in order if not sure 'seq' sorted val sortedseq = seq.sortwith(_ < _) if (seq.size % 2 == 1) sortedseq(sortedseq.size / 2) else { val (up, down) = sortedseq.splitat(seq.size / 2) (up.last + down.head) / 2 } }
we first calculate sums columns , convert seq[long]
:
import org.apache.spark.sql.functions._ val sums = df.select(df.columns.map(c => sum(col(c)).alias(c)): _*) .first.toseq.asinstanceof[seq[long]]
then calculate median
,
val med = median(sums)
and use threshold generate column indices keep:
val cols_keep = sums.zipwithindex.filter(_._1 >= med).map(_._2)
finally, map these indices inside select()
statement:
df.select(cols_keep map df.columns map col: _*).show() +---+---+ | v2| v3| +---+---+ | 3| 5| | 4| 3| +---+---+
Comments
Post a Comment