Spring Integration DSL - Parallel processing -
i have integration flow(productholdingsflow) wherein http response split , routed 3 sub flows. sub flows output channel different integration flow(s). each of these integration flows make 3 different http calls, aggregate results , send response back. have splitter output channel executor channel , messages send independent thread sub flows. seems work fine. wanted sub flow - integration flows call 3 api's in parallel @ moment sequentially executed. here appreciated. below snippet of integration flow code.
main integration flow
@bean(name = pollermetadata.default_poller) public pollermetadata poller(){ return pollers.fixeddelay(1000).get(); } /** * <p> account list orchestration flow entry</p> * * @return */ @bean public integrationflow productholdingsflow() { return flow -> flow.publishsubscribechannel(s -> s // product holdings sapi .subscribe(f -> f.enrichheaders(h -> h.headerexpression("customerid", "payload[0]") ) .handle( http.outboundgateway(productholdingsuri) .httpmethod(httpmethod.get) .expectedresponsetype(productholdingsresponse.class) .errorhandler(new defaultresponseerrorhandler()) ).log("after product holdings") .split(productholdingsresponse.class, productholdingsresponse::getproductarrangementrolerelationship) .channel(c -> c.executor(executors.newcachedthreadpool())) .<productarrangementrolerelationship, string>route(productarrangementrolerelationship::getproductsystemcode, m -> m .subflowmapping(core_banking.syscode(), sf -> sf .channel(c-> c.queue(10)) .channel("currentaccountflow.input") ) .subflowmapping(credit_cards.syscode(), sf -> sf .channel(c-> c.queue(10)) .channel("creditcardflow.input") ) .subflowmapping(share_dealing.syscode(), sf -> sf .channel(c-> c.queue(10)) .channel("sharedealingflow.input") ) ) ) ); }
sub flow - integration flow
@bean public integrationflow currentaccountflow() { return flow -> flow.publishsubscribechannel(s -> s .subscribe(sub -> sub.handle( http.outboundgateway(currentaccounturi) .httpmethod(httpmethod.get) .expectedresponsetype(string.class) .errorhandler(new accountlisterrorhandler())) .transform(new jsontoobjecttransformer(currentaccount.class)) .channel(“aggregatecurrentaccount.input”) ) .subscribe(sub -> sub.handle( http.outboundgateway(productconfituri) .httpmethod(httpmethod.get) .expectedresponsetype(string.class) .errorhandler(new accountlisterrorhandler())) .transform(new jsontoobjecttransformer(productconfig.class)) .channel(“aggregatecurrentaccount.input”) ) ); } @bean public integrationflow aggregatecurrentaccount() { return flow-> flow .aggregate(a->a.releaseexpression(“size() == 2”) .expiregroupsuponcompletion(true) .outputprocessor(g->g.getmessages() .stream() .filter(m->m.getpayload() != null) .map(message::getpayload) .collect(collectors.tocollection(arraylist::new)).toarray())) .channel(“aggregateaccountlist.input”) }
Comments
Post a Comment