Dataflow DynamicDestinations unable to serialize org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite -
i trying use dynamicdestinations write partitioned table in bigquery partition name mytable$yyyymmdd. if bypass dynamicdestinations , supply hardcoded table name in .to()
, works; however, dynamicdestinations following exception:
java.lang.illegalargumentexception: unable serialize org.apache.beam.sdk.io.gcp.bigquery.preparewrite$1@6fff253c @ org.apache.beam.sdk.util.serializableutils.serializetobytearray(serializableutils.java:53) @ org.apache.beam.sdk.util.serializableutils.clone(serializableutils.java:90) @ org.apache.beam.sdk.transforms.pardo$singleoutput.<init>(pardo.java:591) @ org.apache.beam.sdk.transforms.pardo.of(pardo.java:435) @ org.apache.beam.sdk.io.gcp.bigquery.preparewrite.expand(preparewrite.java:51) @ org.apache.beam.sdk.io.gcp.bigquery.preparewrite.expand(preparewrite.java:36) @ org.apache.beam.sdk.pipeline.applyinternal(pipeline.java:514) @ org.apache.beam.sdk.pipeline.applytransform(pipeline.java:473) @ org.apache.beam.sdk.values.pcollection.apply(pcollection.java:297) @ org.apache.beam.sdk.io.gcp.bigquery.bigqueryio$write.expandtyped(bigqueryio.java:987) @ org.apache.beam.sdk.io.gcp.bigquery.bigqueryio$write.expand(bigqueryio.java:972) @ org.apache.beam.sdk.io.gcp.bigquery.bigqueryio$write.expand(bigqueryio.java:659) @ org.apache.beam.sdk.pipeline.applyinternal(pipeline.java:514) @ org.apache.beam.sdk.pipeline.applytransform(pipeline.java:454) @ org.apache.beam.sdk.values.pcollection.apply(pcollection.java:284) @ com.homedepot.payments.monitoring.eventprocessor.metricsaggregator.main(metricsaggregator.java:82) caused by: java.io.notserializableexception: com.google.api.services.bigquery.model.tablereference @ java.io.objectoutputstream.writeobject0(objectoutputstream.java:1184)
and here code:
pcollection<event> rawevents = pipeline .apply("readfrompubsub", pubsubio.readprotos(eventouterclass.event.class) .fromsubscription(options.getsubscription()) ) .apply("parse", pardo.of(new parsefn())) .apply("extractattributes", pardo.of(new extractattributesfn())); eventtable table = new eventtable(options.getprojectid(), options.getmetricsdatasetid(), options.getraweventstable()); rawevents.apply(bigqueryio.<event>write() .to(new dynamicdestinations<event, string>() { private static final long serialversionuid = 1l; @override public tableschema getschema(string destination) { return table.schema(); } @override public tabledestination gettable(string destination) { return new tabledestination(table.reference(), null); } @override public string getdestination(valueinsinglewindow<event> element) { string daystring = datetimeformat.forpattern("yyyymmdd").withzone(datetimezone.utc).tostring(); return table.reference().gettableid() + "$" + daystring; } }) .withformatfunction(new serializablefunction<event, tablerow>() { public tablerow apply(event event) { tablerow row = new tablerow(); event evnt = (event) event; row.set(eventtable.field.version.getname(), evnt.getversion()); row.set(eventtable.field.timestamp.getname(), evnt.gettimestamp() / 1000); row.set(eventtable.field.event_type_id.getname(), evnt.geteventtypeid()); row.set(eventtable.field.event_id.getname(), evnt.getid()); row.set(eventtable.field.location.getname(), evnt.getlocation()); row.set(eventtable.field.service.getname(), evnt.getservice()); row.set(eventtable.field.host.getname(), evnt.gethost()); row.set(eventtable.field.body.getname(), evnt.getbody()); return row; } }) .withcreatedisposition(bigqueryio.write.createdisposition.create_if_needed) .withwritedisposition(bigqueryio.write.writedisposition.write_append) );
any pointers in correct direction appreciated. thanks!
from inspecting exception message , code above, seems eventtable
field used within anonymous dynamicdestinations
class contains tablereference
field not serializable.
one workaround convert anonymous dynamicdestinations
static inner class , define constructor stores serializable pieces of eventtable
needed implement interface.
for example:
private static class eventdestinations extends dynamicdestinations<event, string> { private final tableschema schema; private final tabledestination destination; private final string tableid; private eventdestinations(eventtable table) { this.schema = table.schema(); this.destination = new tabledestination(table.reference(), null); this.tableid = table.reference().gettableid(); } // .. }
Comments
Post a Comment