Dataflow DynamicDestinations unable to serialize org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite -


i trying use dynamicdestinations write partitioned table in bigquery partition name mytable$yyyymmdd. if bypass dynamicdestinations , supply hardcoded table name in .to(), works; however, dynamicdestinations following exception:

java.lang.illegalargumentexception: unable serialize org.apache.beam.sdk.io.gcp.bigquery.preparewrite$1@6fff253c @ org.apache.beam.sdk.util.serializableutils.serializetobytearray(serializableutils.java:53) @ org.apache.beam.sdk.util.serializableutils.clone(serializableutils.java:90) @ org.apache.beam.sdk.transforms.pardo$singleoutput.<init>(pardo.java:591) @ org.apache.beam.sdk.transforms.pardo.of(pardo.java:435) @ org.apache.beam.sdk.io.gcp.bigquery.preparewrite.expand(preparewrite.java:51) @ org.apache.beam.sdk.io.gcp.bigquery.preparewrite.expand(preparewrite.java:36) @ org.apache.beam.sdk.pipeline.applyinternal(pipeline.java:514) @ org.apache.beam.sdk.pipeline.applytransform(pipeline.java:473) @ org.apache.beam.sdk.values.pcollection.apply(pcollection.java:297) @ org.apache.beam.sdk.io.gcp.bigquery.bigqueryio$write.expandtyped(bigqueryio.java:987) @ org.apache.beam.sdk.io.gcp.bigquery.bigqueryio$write.expand(bigqueryio.java:972) @ org.apache.beam.sdk.io.gcp.bigquery.bigqueryio$write.expand(bigqueryio.java:659) @ org.apache.beam.sdk.pipeline.applyinternal(pipeline.java:514) @ org.apache.beam.sdk.pipeline.applytransform(pipeline.java:454) @ org.apache.beam.sdk.values.pcollection.apply(pcollection.java:284) @ com.homedepot.payments.monitoring.eventprocessor.metricsaggregator.main(metricsaggregator.java:82) caused by: java.io.notserializableexception: com.google.api.services.bigquery.model.tablereference @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1184) 

and here code:

pcollection<event> rawevents = pipeline     .apply("readfrompubsub",         pubsubio.readprotos(eventouterclass.event.class)                 .fromsubscription(options.getsubscription())     )     .apply("parse", pardo.of(new parsefn()))     .apply("extractattributes", pardo.of(new extractattributesfn()));   eventtable table = new eventtable(options.getprojectid(), options.getmetricsdatasetid(), options.getraweventstable()); rawevents.apply(bigqueryio.<event>write()     .to(new dynamicdestinations<event, string>() {          private static final long serialversionuid = 1l;          @override         public tableschema getschema(string destination) {             return table.schema();         }          @override         public tabledestination gettable(string destination) {             return new tabledestination(table.reference(), null);         }          @override         public string getdestination(valueinsinglewindow<event> element) {             string daystring = datetimeformat.forpattern("yyyymmdd").withzone(datetimezone.utc).tostring();             return table.reference().gettableid() + "$" + daystring;         }     })     .withformatfunction(new serializablefunction<event, tablerow>() {         public tablerow apply(event event) {             tablerow row = new tablerow();             event evnt = (event) event;             row.set(eventtable.field.version.getname(), evnt.getversion());             row.set(eventtable.field.timestamp.getname(), evnt.gettimestamp() / 1000);             row.set(eventtable.field.event_type_id.getname(), evnt.geteventtypeid());             row.set(eventtable.field.event_id.getname(), evnt.getid());             row.set(eventtable.field.location.getname(), evnt.getlocation());             row.set(eventtable.field.service.getname(), evnt.getservice());             row.set(eventtable.field.host.getname(), evnt.gethost());             row.set(eventtable.field.body.getname(), evnt.getbody());             return row;         }     })     .withcreatedisposition(bigqueryio.write.createdisposition.create_if_needed)     .withwritedisposition(bigqueryio.write.writedisposition.write_append) ); 

any pointers in correct direction appreciated. thanks!

from inspecting exception message , code above, seems eventtable field used within anonymous dynamicdestinations class contains tablereference field not serializable.

one workaround convert anonymous dynamicdestinations static inner class , define constructor stores serializable pieces of eventtable needed implement interface.

for example:

private static class eventdestinations extends dynamicdestinations<event, string> {   private final tableschema schema;   private final tabledestination destination;   private final string tableid;    private eventdestinations(eventtable table) {     this.schema = table.schema();     this.destination = new tabledestination(table.reference(), null);     this.tableid = table.reference().gettableid();   }    // .. } 

Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

Add a dynamic header in angular 2 http provider -

minify - Minimizing css files -