Spring Integration DSL - Parallel processing -


i have integration flow(productholdingsflow) wherein http response split , routed 3 sub flows. sub flows output channel different integration flow(s). each of these integration flows make 3 different http calls, aggregate results , send response back. have splitter output channel executor channel , messages send independent thread sub flows. seems work fine. wanted sub flow - integration flows call 3 api's in parallel @ moment sequentially executed. here appreciated. below snippet of integration flow code.

main integration flow

@bean(name = pollermetadata.default_poller) public pollermetadata poller(){     return pollers.fixeddelay(1000).get(); }  /**  * <p> account list orchestration flow entry</p>  *  * @return  */ @bean public integrationflow productholdingsflow() {     return flow -> flow.publishsubscribechannel(s -> s             // product holdings sapi             .subscribe(f -> f.enrichheaders(h -> h.headerexpression("customerid", "payload[0]")                     )                             .handle(                                     http.outboundgateway(productholdingsuri)                                             .httpmethod(httpmethod.get)                                             .expectedresponsetype(productholdingsresponse.class)                                             .errorhandler(new defaultresponseerrorhandler())                             ).log("after product holdings")                             .split(productholdingsresponse.class, productholdingsresponse::getproductarrangementrolerelationship)                             .channel(c -> c.executor(executors.newcachedthreadpool()))                             .<productarrangementrolerelationship, string>route(productarrangementrolerelationship::getproductsystemcode,                                     m -> m                                             .subflowmapping(core_banking.syscode(), sf -> sf                                                     .channel(c-> c.queue(10))                                                     .channel("currentaccountflow.input")                                             )                                             .subflowmapping(credit_cards.syscode(), sf -> sf                         .channel(c-> c.queue(10))                                                     .channel("creditcardflow.input")                     )                                             .subflowmapping(share_dealing.syscode(), sf -> sf                          .channel(c-> c.queue(10))                                                      .channel("sharedealingflow.input")                                              )                             )              )     );  } 

sub flow - integration flow

@bean public integrationflow currentaccountflow() {     return flow -> flow.publishsubscribechannel(s -> s              .subscribe(sub -> sub.handle(                                     http.outboundgateway(currentaccounturi)                                             .httpmethod(httpmethod.get)                                             .expectedresponsetype(string.class)                                             .errorhandler(new accountlisterrorhandler()))                                  .transform(new jsontoobjecttransformer(currentaccount.class))                                  .channel(“aggregatecurrentaccount.input”)                         )           .subscribe(sub -> sub.handle(                                     http.outboundgateway(productconfituri)                                             .httpmethod(httpmethod.get)                                             .expectedresponsetype(string.class)                                             .errorhandler(new accountlisterrorhandler()))                                  .transform(new jsontoobjecttransformer(productconfig.class))                                  .channel(“aggregatecurrentaccount.input”)                         )       );  }   @bean public integrationflow aggregatecurrentaccount() { return flow-> flow           .aggregate(a->a.releaseexpression(“size() == 2”)           .expiregroupsuponcompletion(true)           .outputprocessor(g->g.getmessages()                 .stream()                 .filter(m->m.getpayload() != null)                 .map(message::getpayload)                 .collect(collectors.tocollection(arraylist::new)).toarray()))          .channel(“aggregateaccountlist.input”)   } 


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -