Reading data from kafka topic via golden gate -


i have spring boot code reads data kafka topic. code works expected when data feed topic via kafka producer console. when try push data kafka topic via golden gate, code doesn't reads data topic, although can see golden gate able write data kafka topic. can suggest why change in behavior?

import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.map.entry;  import org.bson.document; import org.json.jsonarray; import org.json.jsonobject;   import com.fasterxml.jackson.databind.objectmapper; import com.mongodb.basicdbobject; import com.mongodb.mongoclient; import com.mongodb.client.mongocollection; import com.mongodb.client.mongodatabase;  import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream;  public class videoconsumer implements runnable {     private objectmapper objectmapper;     private kafkastream<byte[], byte[]> kafkastream;     private int threadnumber;      public videoconsumer(kafkastream<byte[], byte[]> kafkastream, int threadnumber) {         this.threadnumber = threadnumber;         this.kafkastream = kafkastream;         this.objectmapper = new objectmapper();     }     @override     public void run() {         consumeriterator<byte[], byte[]> = kafkastream.iterator();          while (it.hasnext()) {             byte[] messagedata = it.next().message();             try {                 //string videofrommessage = objectmapper.readvalue(messagedata, string.class);                 //byte[] videofrommessage = it.next().message();                 //system.out.print("got message");                  string streamdata = new string(messagedata);                 system.out.print("thread:" + threadnumber + ".consuming video: " + streamdata + "\n");                 string changed=streamdata.tostring();                 int pos=changed.lastindexof("}}");                 string change=changed.substring(0,pos );                 change=change.replace("}}", "}},");                 string res=change.concat("}}");                   string result="[" +res+ "]";                 system.out.println(result);                 jsonarray json;                      json = new jsonarray(result);                     map<string, list<jsonobject>> ordermongo = new hashmap<>();                     map<string, list<jsonobject>> orderitemmongo = new hashmap<>();                     mongoclient mongoclient = new mongoclient( "localhost" , 27017 );                     mongodatabase db = mongoclient.getdatabase("mongotest");                       mongocollection<document> table = db.getcollection("test1");                     document doc1=new document();                     //gson gson=new gson();                     basicdbobject document = new basicdbobject();                      (int = 0; < json.length(); i++) {                         jsonobject obj = json.getjsonobject(i);                        if(obj.getstring("table").equals("test.s_order_mongo1")){                         list<jsonobject> list = ordermongo.getordefault(obj.getstring("table").equals("test.s_order_mongo1"),new arraylist<>());                         list.add(obj);                         ordermongo.put(obj.getjsonobject("after").getstring("row_id"),list);                        }                        else if(obj.getstring("table").equals("test.s_order_item_mongo1")){                            list<jsonobject> nextlist = orderitemmongo.getordefault(obj.getstring("table").equals("test.s_order_item_mongo1"),new arraylist<>());                         nextlist.add(obj);                      orderitemmongo.put(obj.getjsonobject("after").getstring("order_id"),nextlist);                         }                     }                     system.out.println(ordermongo);                     system.out.println(orderitemmongo);                    // system.out.println(orderitemmongo);                     (entry<string, list<jsonobject>> entry : ordermongo.entryset()) {                         for(entry<string, list<jsonobject>> entry1 : orderitemmongo.entryset()){                             if(entry.getkey().equals(entry1.getkey())){                             //string gsonstring=gson.tojson(entry.getvalue());                               //system.out.println(gsonstring);                                 list<jsonobject> listnext = entry.getvalue();                                 list <jsonobject> orderlinelist=entry1.getvalue();                                 for(jsonobject obj:listnext){                             document doc = new document("status_cd", obj.getjsonobject("after").getstring("status_cd"));                                 if(obj.getjsonobject("after").isnull("integration_id")==true){                             doc.append("integration_id", null);}                              doc.append("x_cust_ref", obj.getjsonobject("after").getstring("x_cust_ref"));                             doc.append("req_ship_dt",obj.getjsonobject("after").getstring("req_ship_dt"));                              if(obj.getjsonobject("after").isnull("quote_id")==true){                             doc.append("quote_id",null);}                              doc.append("accnt_id",obj.getjsonobject("after").getstring("accnt_id"));                             doc.append("active_flg",obj.getjsonobject("after").getstring("active_flg"));                             doc.append("process_timestamp",obj.getjsonobject("after").getstring("process_timestamp"));                             doc.append("contact_id",obj.getjsonobject("after").getstring("contact_id"));                             doc.append("bu_id", obj.getjsonobject("after").getstring("bu_id"));                             doc.append("ship_con_id",obj.getjsonobject("after").getstring("ship_con_id"));                             doc.append("last_upd", obj.getjsonobject("after").getstring("last_upd"));                              if(obj.getjsonobject("after").isnull("x_close_dt")==true){                             doc.append("x_close_dt", null);}                              doc.append("x_sub_stat", obj.getjsonobject("after").getstring("x_sub_stat"));                             doc.append("order_num", obj.getjsonobject("after").getstring("order_num"));                             doc.append("soft_delete", obj.getjsonobject("after").getstring("soft_delete"));                             doc.append("row_id", obj.getjsonobject("after").getstring("row_id"));                             doc.append("last_upd_by",obj.getjsonobject("after").getstring("last_upd_by"));                             doc.append("rev_num",obj.getjsonobject("after").getstring("rev_num"));                             doc.append("order_dt", obj.getjsonobject("after").getstring("order_dt"));                             for(jsonobject object:orderlinelist){                                 if(object.getjsonobject("after").isnull("asset_id")==true){                                      doc1.append("asset_id", null);}                                  if(object.getjsonobject("after").isnull("serv_accnt_id")==true){                                 doc1.append("serv_accnt_id", null);}                                 doc1.append("req_ship_dt",object.getjsonobject("after").getstring("req_ship_dt"));                                  if(object.getjsonobject("after").isnull("x_prod_desc")==true){                                 doc1.append("x_prod_desc",null);}                                  if(object.getjsonobject("after").isnull("ship_con_id")==true){                                 doc1.append("ship_con_id",null);}                                  doc1.append("x_bes_status",object.getjsonobject("after").getstring("x_bes_status"));                                 doc1.append("row_id",object.getjsonobject("after").getstring("row_id"));                                 doc1.append("status_cd",object.getjsonobject("after").getstring("status_cd"));                                 doc1.append("order_id",object.getjsonobject("after").getstring("order_id"));                                 if(object.getjsonobject("after").isnull("completed_dt")==true){                                 doc1.append("completed_dt",null);}                                  doc1.append("last_upd",object.getjsonobject("after").getstring("last_upd"));                                 doc1.append("soft_delete",object.getjsonobject("after").getstring("soft_delete"));                                 doc1.append("integration_id",object.getjsonobject("after").getstring("integration_id"));                                 doc1.append("x_cdd",object.getjsonobject("after").getstring("x_cdd"));                                 doc1.append("action_cd",object.getjsonobject("after").getstring("action_cd"));                                 doc1.append("x_order_item_substatus",object.getjsonobject("after").getstring("x_order_item_substatus"));                                 if(object.getjsonobject("after").isnull("x_appt_ref")==true){                                  doc1.append("x_appt_ref",null);}                                  if(object.getjsonobject("after").isnull("x_cancelled_dt")==true){                                 doc1.append("x_cancelled_dt",null);}                                  doc1.append("prod_id",object.getjsonobject("after").getstring("prod_id"));                                  if(object.getjsonobject("after").isnull("service_num")==true){                                  doc1.append("service_num",null);}                                  if(object.getjsonobject("after").isnull("must_dlvr_by_dt")==true){                                   doc1.append("must_dlvr_by_dt",null);}                                  doc1.append("rollup_flg",object.getjsonobject("after").getstring("rollup_flg"));                                 doc1.append("root_order_item_id",object.getjsonobject("after").getstring("root_order_item_id"));                                 doc1.append("bill_accnt_id",object.getjsonobject("after").getstring("bill_accnt_id"));                                 doc1.append("process_timestamp",object.getjsonobject("after").getstring("process_timestamp"));                                 doc1.append("qty_req",object.getjsonobject("after").getstring("qty_req"));                              }                             doc.append("orderline", doc1);                             table.insertone(doc);                                 }                             }                     }             }        }                       catch (exception e) {                 e.printstacktrace();         }          system.out.println("shutting down thread: " + kafkastream);         }     } }     


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

Add a dynamic header in angular 2 http provider -

minify - Minimizing css files -