mongodb - unable to write data to Mongo from Spark 2.2.0 structured Streaming? -
i have following code , unable write data mongo using following. don't see database or collection names being populated in mongodb. seems wrong. there no exceptions when run code.
private sparksession sparksession; sparkconf sparkconf = new sparkconf(); sparkconf.setmaster(configuration.getconfig().getstring("spark.master")); sparkconf.set("spark.mongodb.input.uri", "mongodb://localhost/analytics.counters"); sparkconf.set("spark.mongodb.output.uri", "mongodb://localhost/analytics.counters"); sparksession sparksession = sparksession.builder().config(sparkconf).getorcreate(); sparksession.sparkcontext().setloglevel("info"); this.sparksession = sparksession; mongoconnector mongoconnector = mongoconnector.apply(sparksession.sparkcontext()); writeconfig writeconfig = getmongowriteconfig(sparksession, "hello"); readconfig readconfig = getmongoreadconfig(sparksession, "hello"); dataset<string> jsonds = newds.select(to_json(struct(col("*")))).as(encoders.string()); dataset<string> dataset = jsonds .map(new mapfunction<string, boolean>() { @override public boolean call(string kafkapayload) throws exception { system.out.println(kafkapayload); document jsondocument = document.parse(kafkapayload); string id = jsondocument.getstring("id"); jsondocument.put("_id", id); return mongoconnector.withcollectiondo(writeconfig, document.class, new function<mongocollection<document>, boolean>() { @override public boolean call(mongocollection<document> collection) throws exception { return collection.replaceone(and(eq("_id", id), lt("timestamp", jsondocument.getstring("timestamp"))), jsondocument, new updateoptions().upsert(true)).wasacknowledged(); } }); } }, encoders.boolean()) streamingquery query1 = dataset .writestream() .trigger(trigger.processingtime(1000)) .foreach(new kafkasink("metrics")) .option("checkpointlocation", getcheckpointpath(checkpointpath.local_write) + "/metrics") .start(); query1.awaittermination(); private static readconfig getmongoreadconfig(sparksession sparksession, string collectionname){ readconfig readconfig = readconfig.create(sparksession); map<string, string> readoverrides = new hashmap<string, string>(); readoverrides.put("readconcern.level", "majority"); readconfig.withoptions(readoverrides); return readconfig; } private static writeconfig getmongowriteconfig(sparksession sparksession, string collectionname) { writeconfig writeconfig = writeconfig.create(sparksession); map<string, string> writeoverrides = new hashmap<string, string>(); writeoverrides.put("writeconcern.w", "majority"); writeconfig.withoptions(writeoverrides); return writeconfig; }
i use spark-submit , pass in following parameters;
spark-submit --master local[*] \ --driver-memory 4g \ --executor-memory 2g \ --class com.hello.stream.app.hello --conf "spark.mongodb.input.uri=mongodb://localhost/analytics.counters" \ --conf "spark.mongodb.output.uri=mongodb://localhost/analytics.counters" \ build/libs/hello-stream.jar
here list of jars use
def sparkversion = '2.2.0' compile group: 'org.apache.spark', name: 'spark-core_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: sparkversion compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: sparkversion compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1' compile group: 'org.mongodb.spark', name: 'mongo-spark-connector_2.11', version: sparkversion compile 'org.mongodb:mongodb-driver:3.0.4'
when run job following output (shorter version of info log)
17/09/12 10:16:12 info mongoclientcache: closing mongoclient: [localhost:27017] 17/09/12 10:16:12 info connection: closed connection [connectionid{localvalue:2, servervalue:2897}] localhost:27017 because pool has been closed. 17/09/12 10:16:18 info streamexecution: streaming query made progress: { "id" : "ddc38876-c44d-4370-a2e0-3c96974e6f24", "runid" : "2ae73227-b9e1-4908-97d6-21d9067994c7", "name" : null, "timestamp" : "2017-09-12t17:16:18.001z", "numinputrows" : 0, "inputrowspersecond" : 0.0, "processedrowspersecond" : 0.0, "durationms" : { "getoffset" : 2, "triggerexecution" : 2 }, "stateoperators" : [ ], "sources" : [ { "description" : "kafkasource[subscribe[hello]]", "startoffset" : { "pn_ingestor_json" : { "0" : 826404 } }, "endoffset" : { "pn_ingestor_json" : { "0" : 826404 } }, "numinputrows" : 0, "inputrowspersecond" : 0.0, "processedrowspersecond" : 0.0 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.foreachsink@7656801e" } }
...... , keeps going while printing info streamexecution: streaming query made progress: but don't see database or collection being created in mongo
try pass class path(--class
) in submit
spark-submit --master local[*] \ --driver-memory 4g \ --class class/package.path.inside.the.jar --executor-memory 2g \ --conf "spark.mongodb.input.uri=mongodb://localhost/analytics.counters" \ --conf "spark.mongodb.output.uri=mongodb://localhost/analytics.counters" \ build/libs/hello-stream.jar
Comments
Post a Comment