Sending files from local folder to FTP using Spring Integration
I have an issue where I'm polling data from FTP server (s) to local folder for example file FEFOexportBEY.csv, once this file is in remote directory, I'm polling it to my local smoothly with not issue, then I'm using this file to generate a new file called finalBEY.csv in my local folder, then I want to push down stream this file to the ftp folder where I got the original one, my problem is that I was able to send the finalBEY.csv one time only, this process will happen frequently so if I pull FEFOexportBEY.csv 3 times per day then I will generate finalBEY.csv three times and will send the same one three times to the down stream, it is not working with me to do so, it is only sending it one time and if I try to remove the finalBEY.csv and generated a new one , the app is not sending it, below is my whole code in config file and controller file, please assist on letting me know how I can keep on watching or polling the local folder for example BEY for new finalBEY.csv and send it to destination.
@Configuration
@EnableIntegration
public class FTIntegration {
public static final String TIMEZONE_UTC = "UTC";
public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
public static final String TEMPORARY_FILE_SUFFIX = ".part";
public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
public static final int MAX_MESSAGES_PER_POLL = 100;
private static final Logger LOG = LoggerFactory.getLogger(FTIntegration.class);
private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";
private static final String OUTBOUND_CHANNEL = "outboundChannel";
/* pulling the server config from postgres DB*/
private final BranchRepository branchRepository;
@Value("${app.temp-dir}")
private String localTempPath;
public FTIntegration(BranchRepository branchRepository) {
this.branchRepository = branchRepository;
}
@Bean
public Branch myBranch(){
return new Branch();
}
/**
* The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
*
* @return default poller.
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
return Pollers
.fixedDelay(POLLER_FIXED_PERIOD_DELAY)
.maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
.transactional()
.get();
}
/**
* The direct channel for the flow.
*
* @return MessageChannel
*/
@Bean
public MessageChannel stockIntermediateChannel() {
return new DirectChannel();
}
/**
* Get the files from a remote directory. Add a timestamp to the filename
* and write them to a local temporary folder.
*
* @return IntegrationFlow
*/
//@Bean
public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch){
final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
.preserveTimestamp(true)
//.patternFilter("*.csv")
.maxFetchSize(MAX_MESSAGES_PER_POLL)
.remoteDirectory(myBranch.getFolderPath())
.regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
.deleteRemoteFiles(true)
.localDirectory(new File(myBranch.getBranchCode()))
.localFilter(new AcceptAllFileListFilter())
.temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
.localFilenameExpression(new FunctionExpression<String>(s -> {
final int fileTypeSepPos = s.lastIndexOf('.');
return DateTimeFormatter
.ofPattern(TIMESTAMP_FORMAT_OF_FILES)
.withZone(ZoneId.of(TIMEZONE_UTC))
.format(Instant.now())
+ "_"
+ s.substring(0,fileTypeSepPos)
+ s.substring(fileTypeSepPos);
}));
// Poller definition
final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
.id("stockInboundPoller")
.autoStartup(true)
.poller(poller());
IntegrationFlow flow = IntegrationFlows
.from(sourceSpecFtp, stockInboundPoller)
.transform(File.class, p ->{
// log step
LOG.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
return p;
})
.channel(CHANNEL_INTERMEDIATE_STAGE)
.get();
return flow;
}
@Bean
public IntegrationFlow stockIntermediateStageChannel() {
IntegrationFlow flow = IntegrationFlows
.from(CHANNEL_INTERMEDIATE_STAGE)
.transform(p -> {
//log step
LOG.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
return p;
})
//TODO
.channel(new NullChannel())
.get();
return flow;
}
/*
* Creating the outbound adaptor
*
* */
public IntegrationFlow localToFtpFlow(Branch myBranch){
return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(true)
.remoteDirectory(myBranch.getFolderPath()))
.get();
}
public interface SendToFtpDirect{
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
public DefaultFtpSessionFactory createNewFtpSessionFactory(Branch branch){
final DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost(branch.getHost());
factory.setUsername(branch.getUsern());
factory.setPort(branch.getFtpPort());
factory.setPassword(branch.getPassword());
return factory;
}
}
Controller class is:
@Controller
public class BranchController {
private BranchService branchService;
private BranchToBranchForm branchToBranchForm;
//@Autowired
private Branch branch;
@Autowired
private FTIntegration.MyGateway myGateway;
@Autowired
private FTIntegration ftIntegration;
@Autowired
private IntegrationFlowContext flowContext;
private FTIntegration.SendToFtpDirect gate;
@Autowired
public void setBranchService(BranchService branchService) {
this.branchService = branchService;
}
@Autowired
public void setBranchToBranchForm(BranchToBranchForm branchToBranchForm) {
this.branchToBranchForm = branchToBranchForm;
}
@RequestMapping( "/")
public String branch(){return "redirect:/branch/list";}
@RequestMapping({"/branch/list","/branch"})
public String listBranches(Model model){
model.addAttribute("branches",branchService.listAll());
return "branch/list";
}
@RequestMapping("/branch/showbranch/{id}")
public String getBranch (@PathVariable String id, Model model){
model.addAttribute("branch", branchService.getById(Long.valueOf(id)));
addFlowFtp(id);
addFlowftpOutbound(id);
return "/branch/showbranch";
}
@RequestMapping("/branch/edit/{id}")
public String edit(@PathVariable String id, Model model){
Branch branch = branchService.getById(Long.valueOf(id));
BranchForm branchForm = branchToBranchForm.convert(branch);
model.addAttribute("branchForm",branchForm);
return "branch/branchform";
}
@RequestMapping("/branch/new")
public String newBranch(Model model){
model.addAttribute("branchForm", new BranchForm());
return "branch/branchform";
}
//@PostMapping
@RequestMapping(value = "/branch", method = RequestMethod.POST)
public String saveOrUpdateBranch(@Valid BranchForm branchForm, BindingResult bindingResult){
if(bindingResult.hasErrors()){
return "branch/branchform";
}
Branch savedBranch = branchService.saveOrUpdateBranchForm(branchForm);
return "redirect:/branch/showbranch/" + savedBranch.getId();
}
@RequestMapping("/branch/delete/{id}")
private String delete(@PathVariable String id){
branchService.delete(Long.valueOf(id));
flowContext.remove(id);
flowContext.remove(id+"o");
return "redirect:/branch/list";
}
private void addFlowFtp(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.fileInboundFlowFromFTPServer(branch);
this.flowContext.registration(flow).id(name).register();
}
private void addFlowftpOutbound(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.localToFtpFlow(branch);//ftpOutboundFlow(branch);
this.flowContext.registration(flow).id(name +"o").register();
//gate.send("BEY".getBytes(),"final"+ branch.getBranchCode()+ ".csv" );
}
}
spring spring-integration
add a comment |
I have an issue where I'm polling data from FTP server (s) to local folder for example file FEFOexportBEY.csv, once this file is in remote directory, I'm polling it to my local smoothly with not issue, then I'm using this file to generate a new file called finalBEY.csv in my local folder, then I want to push down stream this file to the ftp folder where I got the original one, my problem is that I was able to send the finalBEY.csv one time only, this process will happen frequently so if I pull FEFOexportBEY.csv 3 times per day then I will generate finalBEY.csv three times and will send the same one three times to the down stream, it is not working with me to do so, it is only sending it one time and if I try to remove the finalBEY.csv and generated a new one , the app is not sending it, below is my whole code in config file and controller file, please assist on letting me know how I can keep on watching or polling the local folder for example BEY for new finalBEY.csv and send it to destination.
@Configuration
@EnableIntegration
public class FTIntegration {
public static final String TIMEZONE_UTC = "UTC";
public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
public static final String TEMPORARY_FILE_SUFFIX = ".part";
public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
public static final int MAX_MESSAGES_PER_POLL = 100;
private static final Logger LOG = LoggerFactory.getLogger(FTIntegration.class);
private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";
private static final String OUTBOUND_CHANNEL = "outboundChannel";
/* pulling the server config from postgres DB*/
private final BranchRepository branchRepository;
@Value("${app.temp-dir}")
private String localTempPath;
public FTIntegration(BranchRepository branchRepository) {
this.branchRepository = branchRepository;
}
@Bean
public Branch myBranch(){
return new Branch();
}
/**
* The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
*
* @return default poller.
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
return Pollers
.fixedDelay(POLLER_FIXED_PERIOD_DELAY)
.maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
.transactional()
.get();
}
/**
* The direct channel for the flow.
*
* @return MessageChannel
*/
@Bean
public MessageChannel stockIntermediateChannel() {
return new DirectChannel();
}
/**
* Get the files from a remote directory. Add a timestamp to the filename
* and write them to a local temporary folder.
*
* @return IntegrationFlow
*/
//@Bean
public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch){
final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
.preserveTimestamp(true)
//.patternFilter("*.csv")
.maxFetchSize(MAX_MESSAGES_PER_POLL)
.remoteDirectory(myBranch.getFolderPath())
.regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
.deleteRemoteFiles(true)
.localDirectory(new File(myBranch.getBranchCode()))
.localFilter(new AcceptAllFileListFilter())
.temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
.localFilenameExpression(new FunctionExpression<String>(s -> {
final int fileTypeSepPos = s.lastIndexOf('.');
return DateTimeFormatter
.ofPattern(TIMESTAMP_FORMAT_OF_FILES)
.withZone(ZoneId.of(TIMEZONE_UTC))
.format(Instant.now())
+ "_"
+ s.substring(0,fileTypeSepPos)
+ s.substring(fileTypeSepPos);
}));
// Poller definition
final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
.id("stockInboundPoller")
.autoStartup(true)
.poller(poller());
IntegrationFlow flow = IntegrationFlows
.from(sourceSpecFtp, stockInboundPoller)
.transform(File.class, p ->{
// log step
LOG.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
return p;
})
.channel(CHANNEL_INTERMEDIATE_STAGE)
.get();
return flow;
}
@Bean
public IntegrationFlow stockIntermediateStageChannel() {
IntegrationFlow flow = IntegrationFlows
.from(CHANNEL_INTERMEDIATE_STAGE)
.transform(p -> {
//log step
LOG.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
return p;
})
//TODO
.channel(new NullChannel())
.get();
return flow;
}
/*
* Creating the outbound adaptor
*
* */
public IntegrationFlow localToFtpFlow(Branch myBranch){
return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(true)
.remoteDirectory(myBranch.getFolderPath()))
.get();
}
public interface SendToFtpDirect{
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
public DefaultFtpSessionFactory createNewFtpSessionFactory(Branch branch){
final DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost(branch.getHost());
factory.setUsername(branch.getUsern());
factory.setPort(branch.getFtpPort());
factory.setPassword(branch.getPassword());
return factory;
}
}
Controller class is:
@Controller
public class BranchController {
private BranchService branchService;
private BranchToBranchForm branchToBranchForm;
//@Autowired
private Branch branch;
@Autowired
private FTIntegration.MyGateway myGateway;
@Autowired
private FTIntegration ftIntegration;
@Autowired
private IntegrationFlowContext flowContext;
private FTIntegration.SendToFtpDirect gate;
@Autowired
public void setBranchService(BranchService branchService) {
this.branchService = branchService;
}
@Autowired
public void setBranchToBranchForm(BranchToBranchForm branchToBranchForm) {
this.branchToBranchForm = branchToBranchForm;
}
@RequestMapping( "/")
public String branch(){return "redirect:/branch/list";}
@RequestMapping({"/branch/list","/branch"})
public String listBranches(Model model){
model.addAttribute("branches",branchService.listAll());
return "branch/list";
}
@RequestMapping("/branch/showbranch/{id}")
public String getBranch (@PathVariable String id, Model model){
model.addAttribute("branch", branchService.getById(Long.valueOf(id)));
addFlowFtp(id);
addFlowftpOutbound(id);
return "/branch/showbranch";
}
@RequestMapping("/branch/edit/{id}")
public String edit(@PathVariable String id, Model model){
Branch branch = branchService.getById(Long.valueOf(id));
BranchForm branchForm = branchToBranchForm.convert(branch);
model.addAttribute("branchForm",branchForm);
return "branch/branchform";
}
@RequestMapping("/branch/new")
public String newBranch(Model model){
model.addAttribute("branchForm", new BranchForm());
return "branch/branchform";
}
//@PostMapping
@RequestMapping(value = "/branch", method = RequestMethod.POST)
public String saveOrUpdateBranch(@Valid BranchForm branchForm, BindingResult bindingResult){
if(bindingResult.hasErrors()){
return "branch/branchform";
}
Branch savedBranch = branchService.saveOrUpdateBranchForm(branchForm);
return "redirect:/branch/showbranch/" + savedBranch.getId();
}
@RequestMapping("/branch/delete/{id}")
private String delete(@PathVariable String id){
branchService.delete(Long.valueOf(id));
flowContext.remove(id);
flowContext.remove(id+"o");
return "redirect:/branch/list";
}
private void addFlowFtp(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.fileInboundFlowFromFTPServer(branch);
this.flowContext.registration(flow).id(name).register();
}
private void addFlowftpOutbound(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.localToFtpFlow(branch);//ftpOutboundFlow(branch);
this.flowContext.registration(flow).id(name +"o").register();
//gate.send("BEY".getBytes(),"final"+ branch.getBranchCode()+ ".csv" );
}
}
spring spring-integration
What version of Spring Integration?
– Gary Russell
Nov 28 '18 at 15:33
I'm using spring integration core 5.0.6 , spring integration file 5.0.7 and spring integration ftp 5.0.6@Gary Russell
– Elias Khattar
Nov 29 '18 at 7:28
add a comment |
I have an issue where I'm polling data from FTP server (s) to local folder for example file FEFOexportBEY.csv, once this file is in remote directory, I'm polling it to my local smoothly with not issue, then I'm using this file to generate a new file called finalBEY.csv in my local folder, then I want to push down stream this file to the ftp folder where I got the original one, my problem is that I was able to send the finalBEY.csv one time only, this process will happen frequently so if I pull FEFOexportBEY.csv 3 times per day then I will generate finalBEY.csv three times and will send the same one three times to the down stream, it is not working with me to do so, it is only sending it one time and if I try to remove the finalBEY.csv and generated a new one , the app is not sending it, below is my whole code in config file and controller file, please assist on letting me know how I can keep on watching or polling the local folder for example BEY for new finalBEY.csv and send it to destination.
@Configuration
@EnableIntegration
public class FTIntegration {
public static final String TIMEZONE_UTC = "UTC";
public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
public static final String TEMPORARY_FILE_SUFFIX = ".part";
public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
public static final int MAX_MESSAGES_PER_POLL = 100;
private static final Logger LOG = LoggerFactory.getLogger(FTIntegration.class);
private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";
private static final String OUTBOUND_CHANNEL = "outboundChannel";
/* pulling the server config from postgres DB*/
private final BranchRepository branchRepository;
@Value("${app.temp-dir}")
private String localTempPath;
public FTIntegration(BranchRepository branchRepository) {
this.branchRepository = branchRepository;
}
@Bean
public Branch myBranch(){
return new Branch();
}
/**
* The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
*
* @return default poller.
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
return Pollers
.fixedDelay(POLLER_FIXED_PERIOD_DELAY)
.maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
.transactional()
.get();
}
/**
* The direct channel for the flow.
*
* @return MessageChannel
*/
@Bean
public MessageChannel stockIntermediateChannel() {
return new DirectChannel();
}
/**
* Get the files from a remote directory. Add a timestamp to the filename
* and write them to a local temporary folder.
*
* @return IntegrationFlow
*/
//@Bean
public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch){
final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
.preserveTimestamp(true)
//.patternFilter("*.csv")
.maxFetchSize(MAX_MESSAGES_PER_POLL)
.remoteDirectory(myBranch.getFolderPath())
.regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
.deleteRemoteFiles(true)
.localDirectory(new File(myBranch.getBranchCode()))
.localFilter(new AcceptAllFileListFilter())
.temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
.localFilenameExpression(new FunctionExpression<String>(s -> {
final int fileTypeSepPos = s.lastIndexOf('.');
return DateTimeFormatter
.ofPattern(TIMESTAMP_FORMAT_OF_FILES)
.withZone(ZoneId.of(TIMEZONE_UTC))
.format(Instant.now())
+ "_"
+ s.substring(0,fileTypeSepPos)
+ s.substring(fileTypeSepPos);
}));
// Poller definition
final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
.id("stockInboundPoller")
.autoStartup(true)
.poller(poller());
IntegrationFlow flow = IntegrationFlows
.from(sourceSpecFtp, stockInboundPoller)
.transform(File.class, p ->{
// log step
LOG.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
return p;
})
.channel(CHANNEL_INTERMEDIATE_STAGE)
.get();
return flow;
}
@Bean
public IntegrationFlow stockIntermediateStageChannel() {
IntegrationFlow flow = IntegrationFlows
.from(CHANNEL_INTERMEDIATE_STAGE)
.transform(p -> {
//log step
LOG.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
return p;
})
//TODO
.channel(new NullChannel())
.get();
return flow;
}
/*
* Creating the outbound adaptor
*
* */
public IntegrationFlow localToFtpFlow(Branch myBranch){
return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(true)
.remoteDirectory(myBranch.getFolderPath()))
.get();
}
public interface SendToFtpDirect{
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
public DefaultFtpSessionFactory createNewFtpSessionFactory(Branch branch){
final DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost(branch.getHost());
factory.setUsername(branch.getUsern());
factory.setPort(branch.getFtpPort());
factory.setPassword(branch.getPassword());
return factory;
}
}
Controller class is:
@Controller
public class BranchController {
private BranchService branchService;
private BranchToBranchForm branchToBranchForm;
//@Autowired
private Branch branch;
@Autowired
private FTIntegration.MyGateway myGateway;
@Autowired
private FTIntegration ftIntegration;
@Autowired
private IntegrationFlowContext flowContext;
private FTIntegration.SendToFtpDirect gate;
@Autowired
public void setBranchService(BranchService branchService) {
this.branchService = branchService;
}
@Autowired
public void setBranchToBranchForm(BranchToBranchForm branchToBranchForm) {
this.branchToBranchForm = branchToBranchForm;
}
@RequestMapping( "/")
public String branch(){return "redirect:/branch/list";}
@RequestMapping({"/branch/list","/branch"})
public String listBranches(Model model){
model.addAttribute("branches",branchService.listAll());
return "branch/list";
}
@RequestMapping("/branch/showbranch/{id}")
public String getBranch (@PathVariable String id, Model model){
model.addAttribute("branch", branchService.getById(Long.valueOf(id)));
addFlowFtp(id);
addFlowftpOutbound(id);
return "/branch/showbranch";
}
@RequestMapping("/branch/edit/{id}")
public String edit(@PathVariable String id, Model model){
Branch branch = branchService.getById(Long.valueOf(id));
BranchForm branchForm = branchToBranchForm.convert(branch);
model.addAttribute("branchForm",branchForm);
return "branch/branchform";
}
@RequestMapping("/branch/new")
public String newBranch(Model model){
model.addAttribute("branchForm", new BranchForm());
return "branch/branchform";
}
//@PostMapping
@RequestMapping(value = "/branch", method = RequestMethod.POST)
public String saveOrUpdateBranch(@Valid BranchForm branchForm, BindingResult bindingResult){
if(bindingResult.hasErrors()){
return "branch/branchform";
}
Branch savedBranch = branchService.saveOrUpdateBranchForm(branchForm);
return "redirect:/branch/showbranch/" + savedBranch.getId();
}
@RequestMapping("/branch/delete/{id}")
private String delete(@PathVariable String id){
branchService.delete(Long.valueOf(id));
flowContext.remove(id);
flowContext.remove(id+"o");
return "redirect:/branch/list";
}
private void addFlowFtp(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.fileInboundFlowFromFTPServer(branch);
this.flowContext.registration(flow).id(name).register();
}
private void addFlowftpOutbound(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.localToFtpFlow(branch);//ftpOutboundFlow(branch);
this.flowContext.registration(flow).id(name +"o").register();
//gate.send("BEY".getBytes(),"final"+ branch.getBranchCode()+ ".csv" );
}
}
spring spring-integration
I have an issue where I'm polling data from FTP server (s) to local folder for example file FEFOexportBEY.csv, once this file is in remote directory, I'm polling it to my local smoothly with not issue, then I'm using this file to generate a new file called finalBEY.csv in my local folder, then I want to push down stream this file to the ftp folder where I got the original one, my problem is that I was able to send the finalBEY.csv one time only, this process will happen frequently so if I pull FEFOexportBEY.csv 3 times per day then I will generate finalBEY.csv three times and will send the same one three times to the down stream, it is not working with me to do so, it is only sending it one time and if I try to remove the finalBEY.csv and generated a new one , the app is not sending it, below is my whole code in config file and controller file, please assist on letting me know how I can keep on watching or polling the local folder for example BEY for new finalBEY.csv and send it to destination.
@Configuration
@EnableIntegration
public class FTIntegration {
public static final String TIMEZONE_UTC = "UTC";
public static final String TIMESTAMP_FORMAT_OF_FILES = "yyyyMMddHHmmssSSS";
public static final String TEMPORARY_FILE_SUFFIX = ".part";
public static final int POLLER_FIXED_PERIOD_DELAY = 5000;
public static final int MAX_MESSAGES_PER_POLL = 100;
private static final Logger LOG = LoggerFactory.getLogger(FTIntegration.class);
private static final String CHANNEL_INTERMEDIATE_STAGE = "intermediateChannel";
private static final String OUTBOUND_CHANNEL = "outboundChannel";
/* pulling the server config from postgres DB*/
private final BranchRepository branchRepository;
@Value("${app.temp-dir}")
private String localTempPath;
public FTIntegration(BranchRepository branchRepository) {
this.branchRepository = branchRepository;
}
@Bean
public Branch myBranch(){
return new Branch();
}
/**
* The default poller with 5s, 100 messages, RotatingServerAdvice and transaction.
*
* @return default poller.
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller(){
return Pollers
.fixedDelay(POLLER_FIXED_PERIOD_DELAY)
.maxMessagesPerPoll(MAX_MESSAGES_PER_POLL)
.transactional()
.get();
}
/**
* The direct channel for the flow.
*
* @return MessageChannel
*/
@Bean
public MessageChannel stockIntermediateChannel() {
return new DirectChannel();
}
/**
* Get the files from a remote directory. Add a timestamp to the filename
* and write them to a local temporary folder.
*
* @return IntegrationFlow
*/
//@Bean
public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch){
final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
.preserveTimestamp(true)
//.patternFilter("*.csv")
.maxFetchSize(MAX_MESSAGES_PER_POLL)
.remoteDirectory(myBranch.getFolderPath())
.regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
.deleteRemoteFiles(true)
.localDirectory(new File(myBranch.getBranchCode()))
.localFilter(new AcceptAllFileListFilter())
.temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
.localFilenameExpression(new FunctionExpression<String>(s -> {
final int fileTypeSepPos = s.lastIndexOf('.');
return DateTimeFormatter
.ofPattern(TIMESTAMP_FORMAT_OF_FILES)
.withZone(ZoneId.of(TIMEZONE_UTC))
.format(Instant.now())
+ "_"
+ s.substring(0,fileTypeSepPos)
+ s.substring(fileTypeSepPos);
}));
// Poller definition
final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
.id("stockInboundPoller")
.autoStartup(true)
.poller(poller());
IntegrationFlow flow = IntegrationFlows
.from(sourceSpecFtp, stockInboundPoller)
.transform(File.class, p ->{
// log step
LOG.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
return p;
})
.channel(CHANNEL_INTERMEDIATE_STAGE)
.get();
return flow;
}
@Bean
public IntegrationFlow stockIntermediateStageChannel() {
IntegrationFlow flow = IntegrationFlows
.from(CHANNEL_INTERMEDIATE_STAGE)
.transform(p -> {
//log step
LOG.info("flow=stockIntermediateStageChannel, message=rename file: " + p);
return p;
})
//TODO
.channel(new NullChannel())
.get();
return flow;
}
/*
* Creating the outbound adaptor
*
* */
public IntegrationFlow localToFtpFlow(Branch myBranch){
return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(true)
.remoteDirectory(myBranch.getFolderPath()))
.get();
}
public interface SendToFtpDirect{
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
public DefaultFtpSessionFactory createNewFtpSessionFactory(Branch branch){
final DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost(branch.getHost());
factory.setUsername(branch.getUsern());
factory.setPort(branch.getFtpPort());
factory.setPassword(branch.getPassword());
return factory;
}
}
Controller class is:
@Controller
public class BranchController {
private BranchService branchService;
private BranchToBranchForm branchToBranchForm;
//@Autowired
private Branch branch;
@Autowired
private FTIntegration.MyGateway myGateway;
@Autowired
private FTIntegration ftIntegration;
@Autowired
private IntegrationFlowContext flowContext;
private FTIntegration.SendToFtpDirect gate;
@Autowired
public void setBranchService(BranchService branchService) {
this.branchService = branchService;
}
@Autowired
public void setBranchToBranchForm(BranchToBranchForm branchToBranchForm) {
this.branchToBranchForm = branchToBranchForm;
}
@RequestMapping( "/")
public String branch(){return "redirect:/branch/list";}
@RequestMapping({"/branch/list","/branch"})
public String listBranches(Model model){
model.addAttribute("branches",branchService.listAll());
return "branch/list";
}
@RequestMapping("/branch/showbranch/{id}")
public String getBranch (@PathVariable String id, Model model){
model.addAttribute("branch", branchService.getById(Long.valueOf(id)));
addFlowFtp(id);
addFlowftpOutbound(id);
return "/branch/showbranch";
}
@RequestMapping("/branch/edit/{id}")
public String edit(@PathVariable String id, Model model){
Branch branch = branchService.getById(Long.valueOf(id));
BranchForm branchForm = branchToBranchForm.convert(branch);
model.addAttribute("branchForm",branchForm);
return "branch/branchform";
}
@RequestMapping("/branch/new")
public String newBranch(Model model){
model.addAttribute("branchForm", new BranchForm());
return "branch/branchform";
}
//@PostMapping
@RequestMapping(value = "/branch", method = RequestMethod.POST)
public String saveOrUpdateBranch(@Valid BranchForm branchForm, BindingResult bindingResult){
if(bindingResult.hasErrors()){
return "branch/branchform";
}
Branch savedBranch = branchService.saveOrUpdateBranchForm(branchForm);
return "redirect:/branch/showbranch/" + savedBranch.getId();
}
@RequestMapping("/branch/delete/{id}")
private String delete(@PathVariable String id){
branchService.delete(Long.valueOf(id));
flowContext.remove(id);
flowContext.remove(id+"o");
return "redirect:/branch/list";
}
private void addFlowFtp(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.fileInboundFlowFromFTPServer(branch);
this.flowContext.registration(flow).id(name).register();
}
private void addFlowftpOutbound(String name) {
branch = branchService.getById(Long.valueOf(name));
System.out.println(branch.getBranchCode());
IntegrationFlow flow = ftIntegration.localToFtpFlow(branch);//ftpOutboundFlow(branch);
this.flowContext.registration(flow).id(name +"o").register();
//gate.send("BEY".getBytes(),"final"+ branch.getBranchCode()+ ".csv" );
}
}
spring spring-integration
spring spring-integration
edited Dec 4 '18 at 13:25
Elias Khattar
asked Nov 28 '18 at 14:25
Elias KhattarElias Khattar
436
436
What version of Spring Integration?
– Gary Russell
Nov 28 '18 at 15:33
I'm using spring integration core 5.0.6 , spring integration file 5.0.7 and spring integration ftp 5.0.6@Gary Russell
– Elias Khattar
Nov 29 '18 at 7:28
add a comment |
What version of Spring Integration?
– Gary Russell
Nov 28 '18 at 15:33
I'm using spring integration core 5.0.6 , spring integration file 5.0.7 and spring integration ftp 5.0.6@Gary Russell
– Elias Khattar
Nov 29 '18 at 7:28
What version of Spring Integration?
– Gary Russell
Nov 28 '18 at 15:33
What version of Spring Integration?
– Gary Russell
Nov 28 '18 at 15:33
I'm using spring integration core 5.0.6 , spring integration file 5.0.7 and spring integration ftp 5.0.6@Gary Russell
– Elias Khattar
Nov 29 '18 at 7:28
I'm using spring integration core 5.0.6 , spring integration file 5.0.7 and spring integration ftp 5.0.6@Gary Russell
– Elias Khattar
Nov 29 '18 at 7:28
add a comment |
1 Answer
1
active
oldest
votes
If you are using a recent version, a new version of the file should pass, as long as its modified timestamp has changed.
See the documentation.
You can use the local-filter attribute to configure the behavior of the local file system filter. Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter is configured by default. This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore strategy (see Section 12.5, “Metadata Store”) and detects changes to the local file modified time. The default MetadataStore is a SimpleMetadataStore, which stores state in memory.
Check what's in the local filter; also turn on DEBUG logging to see if that provides you with more information.
EDIT
This works fine for me...
@SpringBootApplication
public class So53521657Application {
public static void main(String args) {
SpringApplication.run(So53521657Application.class, args);
}
@Bean
public CachingSessionFactory<FTPFile> sf() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUsername("ftptest");
sf.setPassword("ftptest");
return new CachingSessionFactory<>(sf);
}
@Bean
public IntegrationFlow webToFtpFlow() {
return IntegrationFlows.from(SendToFtpDirect.class)
.log()
.handle(Ftp.outboundAdapter(sf()).remoteDirectory("foo"))
.get();
}
@Bean
public IntegrationFlow ftpToLocalFlow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.remoteDirectory("foo")
.deleteRemoteFiles(true)
.localFilter(new SimplePatternFileListFilter("*.csv"))
.localDirectory(new File("/tmp/foo")), e ->
e.poller(Pollers.fixedDelay(5_000)))
.log()
.<File>handle((p, h) -> {
File newFile = new File("/tmp/bar/" + p.getName().replace(".csv", ".txt"));
newFile.delete();
System.out.println("renaming " + p + " to " + newFile);
p.renameTo(newFile);
return p;
})
.log()
.nullChannel();
}
@Bean
public IntegrationFlow localToFtpFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/bar"))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(), "foo")), e ->
e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(sf())
.remoteDirectory("bar"))
.get();
}
}
@RestController
@DependsOn("webToFtpFlow")
class Controller {
@Autowired
private SendToFtpDirect gate;
@GetMapping(path = "/send/{name}")
public String send(@PathVariable String name) {
gate.send("foo".getBytes(), name + ".csv");
return name + " sent";
}
}
interface SendToFtpDirect {
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
I have been troubleshooting this for few hours but could not get to the solution, as I replied above that my spring integration version is 5.0.6, I tried to totally modify the file so that the date will change but it is not picking it, tried also debugging and could not pinpoint the issue why it is not taking it, do you see any problem with my outbound adaptor or gateway I'm using?when I'm using the send method in my controller does it mean that it will keep on looking in the same folder for that csv file?weird to me if it should be picking it up and it does not@Gary Russell
– Elias Khattar
Nov 29 '18 at 10:16
I don't think it has anything to do with the outbound adapter. If you can't work it out from the DEBUG log; post the complete log someplace like pastebin.
– Gary Russell
Nov 29 '18 at 13:54
Not much showing from the DEBUG log, I edited my code to show localFilter(new AcceptAllFileListFilter()) but still nothing is getting picked up, as well tried FileSystemPersistentAcceptOnceFileListFilter where first argument is a SimpleMetadataStore still nothing, what else I can do to get this sorted, do you have an example that I can look at it and see how this configured or at least try to send you the app and try running it from your end?thanks@Gary Russell
– Elias Khattar
Dec 3 '18 at 11:35
I edited the answer with an app that works fine for me.
– Gary Russell
Dec 3 '18 at 15:55
I updated my project to look some how similar to your example for the file send to ftp and it is working smoothly now, I just need to make it send a specific csv file rather than all the csv files from local folder to server, so might use some regex as I did in my inbound adaptor to pull from server to my local, I guess that would be the better solution...not that I did not user the interface to send the file I just created a flow and registered it with adding a new branch from my web mvc, edited my coding up...@Gary Russell
– Elias Khattar
Dec 4 '18 at 13:24
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53521657%2fsending-files-from-local-folder-to-ftp-using-spring-integration%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
If you are using a recent version, a new version of the file should pass, as long as its modified timestamp has changed.
See the documentation.
You can use the local-filter attribute to configure the behavior of the local file system filter. Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter is configured by default. This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore strategy (see Section 12.5, “Metadata Store”) and detects changes to the local file modified time. The default MetadataStore is a SimpleMetadataStore, which stores state in memory.
Check what's in the local filter; also turn on DEBUG logging to see if that provides you with more information.
EDIT
This works fine for me...
@SpringBootApplication
public class So53521657Application {
public static void main(String args) {
SpringApplication.run(So53521657Application.class, args);
}
@Bean
public CachingSessionFactory<FTPFile> sf() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUsername("ftptest");
sf.setPassword("ftptest");
return new CachingSessionFactory<>(sf);
}
@Bean
public IntegrationFlow webToFtpFlow() {
return IntegrationFlows.from(SendToFtpDirect.class)
.log()
.handle(Ftp.outboundAdapter(sf()).remoteDirectory("foo"))
.get();
}
@Bean
public IntegrationFlow ftpToLocalFlow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.remoteDirectory("foo")
.deleteRemoteFiles(true)
.localFilter(new SimplePatternFileListFilter("*.csv"))
.localDirectory(new File("/tmp/foo")), e ->
e.poller(Pollers.fixedDelay(5_000)))
.log()
.<File>handle((p, h) -> {
File newFile = new File("/tmp/bar/" + p.getName().replace(".csv", ".txt"));
newFile.delete();
System.out.println("renaming " + p + " to " + newFile);
p.renameTo(newFile);
return p;
})
.log()
.nullChannel();
}
@Bean
public IntegrationFlow localToFtpFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/bar"))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(), "foo")), e ->
e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(sf())
.remoteDirectory("bar"))
.get();
}
}
@RestController
@DependsOn("webToFtpFlow")
class Controller {
@Autowired
private SendToFtpDirect gate;
@GetMapping(path = "/send/{name}")
public String send(@PathVariable String name) {
gate.send("foo".getBytes(), name + ".csv");
return name + " sent";
}
}
interface SendToFtpDirect {
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
I have been troubleshooting this for few hours but could not get to the solution, as I replied above that my spring integration version is 5.0.6, I tried to totally modify the file so that the date will change but it is not picking it, tried also debugging and could not pinpoint the issue why it is not taking it, do you see any problem with my outbound adaptor or gateway I'm using?when I'm using the send method in my controller does it mean that it will keep on looking in the same folder for that csv file?weird to me if it should be picking it up and it does not@Gary Russell
– Elias Khattar
Nov 29 '18 at 10:16
I don't think it has anything to do with the outbound adapter. If you can't work it out from the DEBUG log; post the complete log someplace like pastebin.
– Gary Russell
Nov 29 '18 at 13:54
Not much showing from the DEBUG log, I edited my code to show localFilter(new AcceptAllFileListFilter()) but still nothing is getting picked up, as well tried FileSystemPersistentAcceptOnceFileListFilter where first argument is a SimpleMetadataStore still nothing, what else I can do to get this sorted, do you have an example that I can look at it and see how this configured or at least try to send you the app and try running it from your end?thanks@Gary Russell
– Elias Khattar
Dec 3 '18 at 11:35
I edited the answer with an app that works fine for me.
– Gary Russell
Dec 3 '18 at 15:55
I updated my project to look some how similar to your example for the file send to ftp and it is working smoothly now, I just need to make it send a specific csv file rather than all the csv files from local folder to server, so might use some regex as I did in my inbound adaptor to pull from server to my local, I guess that would be the better solution...not that I did not user the interface to send the file I just created a flow and registered it with adding a new branch from my web mvc, edited my coding up...@Gary Russell
– Elias Khattar
Dec 4 '18 at 13:24
add a comment |
If you are using a recent version, a new version of the file should pass, as long as its modified timestamp has changed.
See the documentation.
You can use the local-filter attribute to configure the behavior of the local file system filter. Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter is configured by default. This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore strategy (see Section 12.5, “Metadata Store”) and detects changes to the local file modified time. The default MetadataStore is a SimpleMetadataStore, which stores state in memory.
Check what's in the local filter; also turn on DEBUG logging to see if that provides you with more information.
EDIT
This works fine for me...
@SpringBootApplication
public class So53521657Application {
public static void main(String args) {
SpringApplication.run(So53521657Application.class, args);
}
@Bean
public CachingSessionFactory<FTPFile> sf() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUsername("ftptest");
sf.setPassword("ftptest");
return new CachingSessionFactory<>(sf);
}
@Bean
public IntegrationFlow webToFtpFlow() {
return IntegrationFlows.from(SendToFtpDirect.class)
.log()
.handle(Ftp.outboundAdapter(sf()).remoteDirectory("foo"))
.get();
}
@Bean
public IntegrationFlow ftpToLocalFlow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.remoteDirectory("foo")
.deleteRemoteFiles(true)
.localFilter(new SimplePatternFileListFilter("*.csv"))
.localDirectory(new File("/tmp/foo")), e ->
e.poller(Pollers.fixedDelay(5_000)))
.log()
.<File>handle((p, h) -> {
File newFile = new File("/tmp/bar/" + p.getName().replace(".csv", ".txt"));
newFile.delete();
System.out.println("renaming " + p + " to " + newFile);
p.renameTo(newFile);
return p;
})
.log()
.nullChannel();
}
@Bean
public IntegrationFlow localToFtpFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/bar"))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(), "foo")), e ->
e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(sf())
.remoteDirectory("bar"))
.get();
}
}
@RestController
@DependsOn("webToFtpFlow")
class Controller {
@Autowired
private SendToFtpDirect gate;
@GetMapping(path = "/send/{name}")
public String send(@PathVariable String name) {
gate.send("foo".getBytes(), name + ".csv");
return name + " sent";
}
}
interface SendToFtpDirect {
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
I have been troubleshooting this for few hours but could not get to the solution, as I replied above that my spring integration version is 5.0.6, I tried to totally modify the file so that the date will change but it is not picking it, tried also debugging and could not pinpoint the issue why it is not taking it, do you see any problem with my outbound adaptor or gateway I'm using?when I'm using the send method in my controller does it mean that it will keep on looking in the same folder for that csv file?weird to me if it should be picking it up and it does not@Gary Russell
– Elias Khattar
Nov 29 '18 at 10:16
I don't think it has anything to do with the outbound adapter. If you can't work it out from the DEBUG log; post the complete log someplace like pastebin.
– Gary Russell
Nov 29 '18 at 13:54
Not much showing from the DEBUG log, I edited my code to show localFilter(new AcceptAllFileListFilter()) but still nothing is getting picked up, as well tried FileSystemPersistentAcceptOnceFileListFilter where first argument is a SimpleMetadataStore still nothing, what else I can do to get this sorted, do you have an example that I can look at it and see how this configured or at least try to send you the app and try running it from your end?thanks@Gary Russell
– Elias Khattar
Dec 3 '18 at 11:35
I edited the answer with an app that works fine for me.
– Gary Russell
Dec 3 '18 at 15:55
I updated my project to look some how similar to your example for the file send to ftp and it is working smoothly now, I just need to make it send a specific csv file rather than all the csv files from local folder to server, so might use some regex as I did in my inbound adaptor to pull from server to my local, I guess that would be the better solution...not that I did not user the interface to send the file I just created a flow and registered it with adding a new branch from my web mvc, edited my coding up...@Gary Russell
– Elias Khattar
Dec 4 '18 at 13:24
add a comment |
If you are using a recent version, a new version of the file should pass, as long as its modified timestamp has changed.
See the documentation.
You can use the local-filter attribute to configure the behavior of the local file system filter. Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter is configured by default. This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore strategy (see Section 12.5, “Metadata Store”) and detects changes to the local file modified time. The default MetadataStore is a SimpleMetadataStore, which stores state in memory.
Check what's in the local filter; also turn on DEBUG logging to see if that provides you with more information.
EDIT
This works fine for me...
@SpringBootApplication
public class So53521657Application {
public static void main(String args) {
SpringApplication.run(So53521657Application.class, args);
}
@Bean
public CachingSessionFactory<FTPFile> sf() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUsername("ftptest");
sf.setPassword("ftptest");
return new CachingSessionFactory<>(sf);
}
@Bean
public IntegrationFlow webToFtpFlow() {
return IntegrationFlows.from(SendToFtpDirect.class)
.log()
.handle(Ftp.outboundAdapter(sf()).remoteDirectory("foo"))
.get();
}
@Bean
public IntegrationFlow ftpToLocalFlow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.remoteDirectory("foo")
.deleteRemoteFiles(true)
.localFilter(new SimplePatternFileListFilter("*.csv"))
.localDirectory(new File("/tmp/foo")), e ->
e.poller(Pollers.fixedDelay(5_000)))
.log()
.<File>handle((p, h) -> {
File newFile = new File("/tmp/bar/" + p.getName().replace(".csv", ".txt"));
newFile.delete();
System.out.println("renaming " + p + " to " + newFile);
p.renameTo(newFile);
return p;
})
.log()
.nullChannel();
}
@Bean
public IntegrationFlow localToFtpFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/bar"))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(), "foo")), e ->
e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(sf())
.remoteDirectory("bar"))
.get();
}
}
@RestController
@DependsOn("webToFtpFlow")
class Controller {
@Autowired
private SendToFtpDirect gate;
@GetMapping(path = "/send/{name}")
public String send(@PathVariable String name) {
gate.send("foo".getBytes(), name + ".csv");
return name + " sent";
}
}
interface SendToFtpDirect {
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
If you are using a recent version, a new version of the file should pass, as long as its modified timestamp has changed.
See the documentation.
You can use the local-filter attribute to configure the behavior of the local file system filter. Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter is configured by default. This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore strategy (see Section 12.5, “Metadata Store”) and detects changes to the local file modified time. The default MetadataStore is a SimpleMetadataStore, which stores state in memory.
Check what's in the local filter; also turn on DEBUG logging to see if that provides you with more information.
EDIT
This works fine for me...
@SpringBootApplication
public class So53521657Application {
public static void main(String args) {
SpringApplication.run(So53521657Application.class, args);
}
@Bean
public CachingSessionFactory<FTPFile> sf() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUsername("ftptest");
sf.setPassword("ftptest");
return new CachingSessionFactory<>(sf);
}
@Bean
public IntegrationFlow webToFtpFlow() {
return IntegrationFlows.from(SendToFtpDirect.class)
.log()
.handle(Ftp.outboundAdapter(sf()).remoteDirectory("foo"))
.get();
}
@Bean
public IntegrationFlow ftpToLocalFlow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.remoteDirectory("foo")
.deleteRemoteFiles(true)
.localFilter(new SimplePatternFileListFilter("*.csv"))
.localDirectory(new File("/tmp/foo")), e ->
e.poller(Pollers.fixedDelay(5_000)))
.log()
.<File>handle((p, h) -> {
File newFile = new File("/tmp/bar/" + p.getName().replace(".csv", ".txt"));
newFile.delete();
System.out.println("renaming " + p + " to " + newFile);
p.renameTo(newFile);
return p;
})
.log()
.nullChannel();
}
@Bean
public IntegrationFlow localToFtpFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/bar"))
.filter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(), "foo")), e ->
e.poller(Pollers.fixedDelay(10_000)))
.log()
.handle(Ftp.outboundAdapter(sf())
.remoteDirectory("bar"))
.get();
}
}
@RestController
@DependsOn("webToFtpFlow")
class Controller {
@Autowired
private SendToFtpDirect gate;
@GetMapping(path = "/send/{name}")
public String send(@PathVariable String name) {
gate.send("foo".getBytes(), name + ".csv");
return name + " sent";
}
}
interface SendToFtpDirect {
void send(byte bytes, @Header(FileHeaders.FILENAME) String filename);
}
edited Dec 3 '18 at 15:54
answered Nov 28 '18 at 16:19
Gary RussellGary Russell
84.2k84976
84.2k84976
I have been troubleshooting this for few hours but could not get to the solution, as I replied above that my spring integration version is 5.0.6, I tried to totally modify the file so that the date will change but it is not picking it, tried also debugging and could not pinpoint the issue why it is not taking it, do you see any problem with my outbound adaptor or gateway I'm using?when I'm using the send method in my controller does it mean that it will keep on looking in the same folder for that csv file?weird to me if it should be picking it up and it does not@Gary Russell
– Elias Khattar
Nov 29 '18 at 10:16
I don't think it has anything to do with the outbound adapter. If you can't work it out from the DEBUG log; post the complete log someplace like pastebin.
– Gary Russell
Nov 29 '18 at 13:54
Not much showing from the DEBUG log, I edited my code to show localFilter(new AcceptAllFileListFilter()) but still nothing is getting picked up, as well tried FileSystemPersistentAcceptOnceFileListFilter where first argument is a SimpleMetadataStore still nothing, what else I can do to get this sorted, do you have an example that I can look at it and see how this configured or at least try to send you the app and try running it from your end?thanks@Gary Russell
– Elias Khattar
Dec 3 '18 at 11:35
I edited the answer with an app that works fine for me.
– Gary Russell
Dec 3 '18 at 15:55
I updated my project to look some how similar to your example for the file send to ftp and it is working smoothly now, I just need to make it send a specific csv file rather than all the csv files from local folder to server, so might use some regex as I did in my inbound adaptor to pull from server to my local, I guess that would be the better solution...not that I did not user the interface to send the file I just created a flow and registered it with adding a new branch from my web mvc, edited my coding up...@Gary Russell
– Elias Khattar
Dec 4 '18 at 13:24
add a comment |
I have been troubleshooting this for few hours but could not get to the solution, as I replied above that my spring integration version is 5.0.6, I tried to totally modify the file so that the date will change but it is not picking it, tried also debugging and could not pinpoint the issue why it is not taking it, do you see any problem with my outbound adaptor or gateway I'm using?when I'm using the send method in my controller does it mean that it will keep on looking in the same folder for that csv file?weird to me if it should be picking it up and it does not@Gary Russell
– Elias Khattar
Nov 29 '18 at 10:16
I don't think it has anything to do with the outbound adapter. If you can't work it out from the DEBUG log; post the complete log someplace like pastebin.
– Gary Russell
Nov 29 '18 at 13:54
Not much showing from the DEBUG log, I edited my code to show localFilter(new AcceptAllFileListFilter()) but still nothing is getting picked up, as well tried FileSystemPersistentAcceptOnceFileListFilter where first argument is a SimpleMetadataStore still nothing, what else I can do to get this sorted, do you have an example that I can look at it and see how this configured or at least try to send you the app and try running it from your end?thanks@Gary Russell
– Elias Khattar
Dec 3 '18 at 11:35
I edited the answer with an app that works fine for me.
– Gary Russell
Dec 3 '18 at 15:55
I updated my project to look some how similar to your example for the file send to ftp and it is working smoothly now, I just need to make it send a specific csv file rather than all the csv files from local folder to server, so might use some regex as I did in my inbound adaptor to pull from server to my local, I guess that would be the better solution...not that I did not user the interface to send the file I just created a flow and registered it with adding a new branch from my web mvc, edited my coding up...@Gary Russell
– Elias Khattar
Dec 4 '18 at 13:24
I have been troubleshooting this for few hours but could not get to the solution, as I replied above that my spring integration version is 5.0.6, I tried to totally modify the file so that the date will change but it is not picking it, tried also debugging and could not pinpoint the issue why it is not taking it, do you see any problem with my outbound adaptor or gateway I'm using?when I'm using the send method in my controller does it mean that it will keep on looking in the same folder for that csv file?weird to me if it should be picking it up and it does not@Gary Russell
– Elias Khattar
Nov 29 '18 at 10:16
I have been troubleshooting this for few hours but could not get to the solution, as I replied above that my spring integration version is 5.0.6, I tried to totally modify the file so that the date will change but it is not picking it, tried also debugging and could not pinpoint the issue why it is not taking it, do you see any problem with my outbound adaptor or gateway I'm using?when I'm using the send method in my controller does it mean that it will keep on looking in the same folder for that csv file?weird to me if it should be picking it up and it does not@Gary Russell
– Elias Khattar
Nov 29 '18 at 10:16
I don't think it has anything to do with the outbound adapter. If you can't work it out from the DEBUG log; post the complete log someplace like pastebin.
– Gary Russell
Nov 29 '18 at 13:54
I don't think it has anything to do with the outbound adapter. If you can't work it out from the DEBUG log; post the complete log someplace like pastebin.
– Gary Russell
Nov 29 '18 at 13:54
Not much showing from the DEBUG log, I edited my code to show localFilter(new AcceptAllFileListFilter()) but still nothing is getting picked up, as well tried FileSystemPersistentAcceptOnceFileListFilter where first argument is a SimpleMetadataStore still nothing, what else I can do to get this sorted, do you have an example that I can look at it and see how this configured or at least try to send you the app and try running it from your end?thanks@Gary Russell
– Elias Khattar
Dec 3 '18 at 11:35
Not much showing from the DEBUG log, I edited my code to show localFilter(new AcceptAllFileListFilter()) but still nothing is getting picked up, as well tried FileSystemPersistentAcceptOnceFileListFilter where first argument is a SimpleMetadataStore still nothing, what else I can do to get this sorted, do you have an example that I can look at it and see how this configured or at least try to send you the app and try running it from your end?thanks@Gary Russell
– Elias Khattar
Dec 3 '18 at 11:35
I edited the answer with an app that works fine for me.
– Gary Russell
Dec 3 '18 at 15:55
I edited the answer with an app that works fine for me.
– Gary Russell
Dec 3 '18 at 15:55
I updated my project to look some how similar to your example for the file send to ftp and it is working smoothly now, I just need to make it send a specific csv file rather than all the csv files from local folder to server, so might use some regex as I did in my inbound adaptor to pull from server to my local, I guess that would be the better solution...not that I did not user the interface to send the file I just created a flow and registered it with adding a new branch from my web mvc, edited my coding up...@Gary Russell
– Elias Khattar
Dec 4 '18 at 13:24
I updated my project to look some how similar to your example for the file send to ftp and it is working smoothly now, I just need to make it send a specific csv file rather than all the csv files from local folder to server, so might use some regex as I did in my inbound adaptor to pull from server to my local, I guess that would be the better solution...not that I did not user the interface to send the file I just created a flow and registered it with adding a new branch from my web mvc, edited my coding up...@Gary Russell
– Elias Khattar
Dec 4 '18 at 13:24
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53521657%2fsending-files-from-local-folder-to-ftp-using-spring-integration%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
What version of Spring Integration?
– Gary Russell
Nov 28 '18 at 15:33
I'm using spring integration core 5.0.6 , spring integration file 5.0.7 and spring integration ftp 5.0.6@Gary Russell
– Elias Khattar
Nov 29 '18 at 7:28