How to avoid data loss in Spark Structured streaming when defining structure -
i have simple employee json string coming kafka, , have defined structured below, if of additional fields come in not able access like, "ssn field" showed in sample json. there way can put not-defined fields in key-value map.
i don't want use infer schema option. please don't recommend answer
public static void main(string[] args) throws exception { string brokers = "quickstart:9092"; string topics = "simple_topic_6"; string master = "local[*]"; sparksession sparksession = sparksession .builder().appname(simplekafkaprocessorext.class.getname()) .master(master).getorcreate(); sqlcontext sqlcontext = sparksession.sqlcontext(); sparkcontext context = sparksession.sparkcontext(); context.setloglevel("error"); list<structfield> employeefields = new arraylist<>(); employeefields.add(datatypes.createstructfield("firstname", datatypes.stringtype, true)); employeefields.add(datatypes.createstructfield("lastname", datatypes.stringtype, true)); employeefields.add(datatypes.createstructfield("email", datatypes.stringtype, true)); list<structfield> addressfields = new arraylist<>(); addressfields.add(datatypes.createstructfield("city", datatypes.stringtype, true)); addressfields.add(datatypes.createstructfield("state", datatypes.stringtype, true)); addressfields.add(datatypes.createstructfield("zip", datatypes.stringtype, true)); arraytype addressstruct = datatypes.createarraytype( datatypes.createstructtype(addressfields)); employeefields.add(datatypes.createstructfield("addresses", addressstruct, true)); structtype employeeschema = datatypes.createstructtype(employeefields); dataset<row> rawdataset = sparksession.readstream() .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", topics).load(); rawdataset.printschema(); rawdataset.createorreplacetempview("basicview"); dataset<row> processeddataset = sqlcontext.sql("select value, string(value) message basicview"); processeddataset = processeddataset.withcolumn("employeerecord", functions.from_json(processeddataset.col("strvalue"),employeeschema, new hashmap<>()));
sample json
{ "employee": { "firstname": "manjesh", "lastname": "gowda", "ssn": "1234", "addresses": [ { "street": "36th", "city": "nyc", "state": "ny" }, { "street": "37th", "city": "nyc", "state": "ny" } ] } }
Comments
Post a Comment