python - Pysparkling 2 reset monotonically_increasing_id from 1 -
i want split spark dataframe 2 pieces , defined row number each of sub-dataframe. found function monotonically_increasing_id still define row number original dataframe.
here did in python:
# df original sparkframe splits = df.randomsplit([7.0,3.0],400) # add column rowid 2 subframes set1 = splits[0].withcolumn("rowid", monotonically_increasing_id()) set2 = splits[1].withcolumn("rowid", monotonically_increasing_id()) # check results set1.select("rowid").show() set2.select("rowid").show()
i expect first 5 elements of rowid 2 frames both 1 5 (or 0 4, can't remember clearly):
set1: 1 2 3 4 5 set2: 1 2 3 4 5
but got is:
set1: 1 3 4 7 9 set2: 2 5 6 8 10
the 2 subframes' row id row id in original sparkframe df not new ones.
as newbee of spark, seeking helps on why happened , how fix it.
first of all, version of spark using? monotonically_increasing_id
method implementation has been changed few times. can reproduce problem in spark 2.0, seems behavior different in spark 2.2. bug fixed in newer spark release.
that being said, should not expect value generated monotonically_increasing_id
increase consecutively. in code, believe there 1 partition dataframe. according http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
the generated id guaranteed monotonically increasing , unique, but not consecutive. the current implementation puts partition id in upper 31 bits, , record number within each partition in lower 33 bits. assumption data frame has less 1 billion partitions, , each partition has less 8 billion records.
as example, consider dataframe 2 partitions, each 3 records. expression return following ids: 0, 1, 2, 8589934592 (1l << 33), 8589934593, 8589934594.
so if code should not expect rowid increased consecutively.
besides, should consider caching scenario , failure scenarios. if monotonically_increase_id works expect -- increase value consecutively, still not work. if node fails? partitions on failed node regenerated source or last cache/checkpoint, might have different order different rowid. eviction out of cache causes issue. assuming after generating dataframe , cache memory. if evicted out of memory. future action try regenerate dataframe again, gives out different rowid.
Comments
Post a Comment