mongodb - unable to write data to Mongo from Spark 2.2.0 structured Streaming? -


i have following code , unable write data mongo using following. don't see database or collection names being populated in mongodb. seems wrong. there no exceptions when run code.

    private sparksession sparksession;      sparkconf sparkconf = new sparkconf();     sparkconf.setmaster(configuration.getconfig().getstring("spark.master"));     sparkconf.set("spark.mongodb.input.uri", "mongodb://localhost/analytics.counters");     sparkconf.set("spark.mongodb.output.uri", "mongodb://localhost/analytics.counters");     sparksession sparksession = sparksession.builder().config(sparkconf).getorcreate();     sparksession.sparkcontext().setloglevel("info");     this.sparksession = sparksession;      mongoconnector mongoconnector = mongoconnector.apply(sparksession.sparkcontext());     writeconfig writeconfig  = getmongowriteconfig(sparksession, "hello");     readconfig readconfig = getmongoreadconfig(sparksession, "hello");      dataset<string> jsonds = newds.select(to_json(struct(col("*")))).as(encoders.string());      dataset<string> dataset = jsonds             .map(new mapfunction<string, boolean>() {                 @override                 public boolean call(string kafkapayload) throws exception {                     system.out.println(kafkapayload);                     document jsondocument = document.parse(kafkapayload);                     string id = jsondocument.getstring("id");                     jsondocument.put("_id", id);                     return mongoconnector.withcollectiondo(writeconfig, document.class, new function<mongocollection<document>, boolean>() {                         @override                         public boolean call(mongocollection<document> collection) throws exception {                             return collection.replaceone(and(eq("_id", id), lt("timestamp", jsondocument.getstring("timestamp"))),                                     jsondocument, new updateoptions().upsert(true)).wasacknowledged();                         }                     });                 }             }, encoders.boolean())      streamingquery query1 = dataset                                 .writestream()                                 .trigger(trigger.processingtime(1000))                                 .foreach(new kafkasink("metrics"))                                 .option("checkpointlocation", getcheckpointpath(checkpointpath.local_write) + "/metrics")                                 .start();     query1.awaittermination();  private static readconfig getmongoreadconfig(sparksession sparksession, string collectionname){     readconfig readconfig = readconfig.create(sparksession);     map<string, string> readoverrides = new hashmap<string, string>();     readoverrides.put("readconcern.level", "majority");     readconfig.withoptions(readoverrides);     return readconfig; }  private static writeconfig getmongowriteconfig(sparksession sparksession, string collectionname) {     writeconfig writeconfig  = writeconfig.create(sparksession);     map<string, string> writeoverrides = new hashmap<string, string>();     writeoverrides.put("writeconcern.w", "majority");     writeconfig.withoptions(writeoverrides);     return writeconfig; } 

i use spark-submit , pass in following parameters;

spark-submit --master local[*] \   --driver-memory 4g \   --executor-memory 2g \   --class com.hello.stream.app.hello   --conf "spark.mongodb.input.uri=mongodb://localhost/analytics.counters" \   --conf "spark.mongodb.output.uri=mongodb://localhost/analytics.counters" \   build/libs/hello-stream.jar  

here list of jars use

def sparkversion = '2.2.0' compile group: 'org.apache.spark', name: 'spark-core_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: sparkversion compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1' compile group: 'org.mongodb.spark', name: 'mongo-spark-connector_2.11', version: sparkversion compile 'org.mongodb:mongodb-driver:3.0.4' 

when run job following output (shorter version of info log)

17/09/12 10:16:12 info mongoclientcache: closing mongoclient: [localhost:27017] 17/09/12 10:16:12 info connection: closed connection [connectionid{localvalue:2, servervalue:2897}] localhost:27017 because pool has been closed. 17/09/12 10:16:18 info streamexecution: streaming query made progress: {   "id" : "ddc38876-c44d-4370-a2e0-3c96974e6f24",   "runid" : "2ae73227-b9e1-4908-97d6-21d9067994c7",   "name" : null,   "timestamp" : "2017-09-12t17:16:18.001z",   "numinputrows" : 0,   "inputrowspersecond" : 0.0,   "processedrowspersecond" : 0.0,   "durationms" : {     "getoffset" : 2,     "triggerexecution" : 2   },   "stateoperators" : [ ],   "sources" : [ {     "description" : "kafkasource[subscribe[hello]]",     "startoffset" : {       "pn_ingestor_json" : {         "0" : 826404       }     },     "endoffset" : {       "pn_ingestor_json" : {         "0" : 826404       }     },     "numinputrows" : 0,     "inputrowspersecond" : 0.0,     "processedrowspersecond" : 0.0   } ],   "sink" : {     "description" : "org.apache.spark.sql.execution.streaming.foreachsink@7656801e"   } } 

...... , keeps going while printing info streamexecution: streaming query made progress: but don't see database or collection being created in mongo

try pass class path(--class) in submit

spark-submit --master local[*] \   --driver-memory 4g \   --class class/package.path.inside.the.jar   --executor-memory 2g \   --conf "spark.mongodb.input.uri=mongodb://localhost/analytics.counters" \   --conf "spark.mongodb.output.uri=mongodb://localhost/analytics.counters" \   build/libs/hello-stream.jar  

Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -