How to avoid data loss in Spark Structured streaming when defining structure -


i have simple employee json string coming kafka, , have defined structured below, if of additional fields come in not able access like, "ssn field" showed in sample json. there way can put not-defined fields in key-value map.

i don't want use infer schema option. please don't recommend answer

public static void main(string[] args) throws exception {      string brokers = "quickstart:9092";     string topics = "simple_topic_6";     string master = "local[*]";      sparksession sparksession = sparksession             .builder().appname(simplekafkaprocessorext.class.getname())             .master(master).getorcreate();     sqlcontext sqlcontext = sparksession.sqlcontext();     sparkcontext context = sparksession.sparkcontext();     context.setloglevel("error");      list<structfield> employeefields = new arraylist<>();     employeefields.add(datatypes.createstructfield("firstname", datatypes.stringtype, true));     employeefields.add(datatypes.createstructfield("lastname", datatypes.stringtype, true));     employeefields.add(datatypes.createstructfield("email", datatypes.stringtype, true));      list<structfield> addressfields = new arraylist<>();     addressfields.add(datatypes.createstructfield("city", datatypes.stringtype, true));     addressfields.add(datatypes.createstructfield("state", datatypes.stringtype, true));     addressfields.add(datatypes.createstructfield("zip", datatypes.stringtype, true));     arraytype addressstruct = datatypes.createarraytype( datatypes.createstructtype(addressfields));      employeefields.add(datatypes.createstructfield("addresses", addressstruct, true));     structtype employeeschema = datatypes.createstructtype(employeefields);      dataset<row> rawdataset = sparksession.readstream()             .format("kafka")             .option("kafka.bootstrap.servers", brokers)             .option("subscribe", topics).load();     rawdataset.printschema();     rawdataset.createorreplacetempview("basicview");      dataset<row> processeddataset = sqlcontext.sql("select value, string(value)  message basicview");     processeddataset = processeddataset.withcolumn("employeerecord",             functions.from_json(processeddataset.col("strvalue"),employeeschema, new hashmap<>())); 

sample json

{   "employee": {     "firstname": "manjesh",     "lastname": "gowda",     "ssn": "1234",     "addresses": [       {         "street": "36th",         "city": "nyc",         "state": "ny"       },       {         "street": "37th",         "city": "nyc",         "state": "ny"       }     ]   } } 


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -