当前位置: 首页 > news >正文

Spring Integration SFTP集成

问题

需要通过Spring使用SFTP上传文件。

思路

集成Spring Integration SFTP进行文件上传。

步骤

pom.xml

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-sftp</artifactId><version>6.3.3</version>
</dependency>

SftpConfig.java

package com.xxx.auth.config;import com.xxx.auth.service.ICxxpAuditLogService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.sshd.sftp.client.SftpClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.integration.sftp.outbound.SftpMessageHandler;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.CollectionUtils;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;@RefreshScope
@Slf4j
@Configuration
public class SftpConfig {/*** log id key 调用的时候,通过消息传入进来的id数据key*/public static final String LOG_ID = "log_id";@Value("${sftp.host:localhost}")private String host;@Value("${sftp.port:22}")private int port;@Value("${sftp.user:foo}")private String user;@Value("${sftp.password:foo}")private String password;@Value("${sftp.remote.directory:/}")private String sftpRemoteDirectory;@Value("${sftp.appCode:KCGLPT}")private String appCode;@Resourceprivate ICxxpAuditLogService iCxxpAuditLogService;@Beanpublic MessageChannel sftpSuccessChannel() {return new DirectChannel(); // 成功的消息通道}@Beanpublic SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);factory.setHost(host);factory.setPort(port);factory.setUser(user);factory.setPassword(password);factory.setAllowUnknownKeys(true);return new CachingSessionFactory<>(factory);}@Bean@ServiceActivator(inputChannel = "toSftpChannel", adviceChain = "after")public MessageHandler handler() {SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));handler.setFileNameGenerator(message -> {// 获取当前时间LocalDateTime currentTime = LocalDateTime.now();// 定义时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("_yyyy_MM_dd_HH_mm_ss");// 将当前时间格式化为字符串String formattedTime = currentTime.format(formatter);// 定义上传文件名return String.format("%s%s.log", appCode, formattedTime);});return handler;}@Beanpublic ExpressionEvaluatingRequestHandlerAdvice after() {ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();advice.setPropagateEvaluationFailures(true);advice.setSuccessChannelName("sftpSuccessChannel"); // 设置成功通道的名称advice.setFailureChannelName("sftpFailureChannel"); // 设置上传失败通道名称return advice;}// 监听成功的消息通道@ServiceActivator(inputChannel = "sftpSuccessChannel")public void handleSftpSuccess(AdviceMessage<String> message) {// 从消息头中获取 日志idList<String> logIdListStr = (List<String>) message.getInputMessage().getHeaders().get(LOG_ID);if (CollectionUtils.isEmpty(logIdListStr)) {log.warn("处理上传成功的消息,日志 ID为空");} else {// TODO 上传文件完成的处理// 归档日志log.info("处理上传成功的消息,日志 ID: {}", String.join(", ", logIdListStr));iCxxpAuditLogService.complete(logIdListStr);}}// 监听失败的消息通道@ServiceActivator(inputChannel = "sftpFailureChannel")public void handleSftpFailure(ErrorMessage message) {Throwable cause = message.getPayload();log.error("上传SFTP错误原因:", cause);// 从消息头中获取 日志idList<String> logIdListStr = (List<String>) message.getHeaders().get(LOG_ID);if (CollectionUtils.isEmpty(logIdListStr)) {log.error("处理上传失败的消息,日志 ID为空");} else {log.error("处理上传失败的消息,日志 ID: {}", String.join(", ", logIdListStr));}}@MessagingGatewaypublic interface SftpGateway {@Gateway(requestChannel = "toSftpChannel")void sendToSftp(Message<String> message);}
}

这是配置SFTP上传文件的配置,下面主要是使用SftpGateway调用sendToSftp方法进行文件上传,这个方法也可以使用流,也可以使用File类,传入,这里直接使用字符串内容传入了。

使用上传

// 注入上传网关@Resourceprivate SftpConfig.SftpGateway sftpGateway;
...
// sftp上传文件内容
String payload = "hello";
sftpGateway.sendToSftp(MessageBuilder.withPayload(payload).setHeader(LOG_ID, ids).build());

这里就是使用Spring Integration SFTP 上传文件内容。

总结

这里主要是配置Spring Integration SFTP完成SFTP上传,这里主要复杂的地方,就是这个消息链机制,有点让人晕,不过主要就是上传成功了又个专门消息处理,上传失败了又有另外一个消息处理。然后,就是在使用上传的时候,可以通过往消息头里面设置key来传数据。

参考

  • SFTP Adapters
  • Spring Integration: SFTP Upload Example Using Key-Based Authentication
  • SFTP Inbound Channel Adapter
  • SFTP Streaming Inbound Channel Adapter

http://www.mrgr.cn/news/23584.html

相关文章:

  • 代码随想录:单调栈1-3
  • 【Kubernetes】记录一下初始化Kubernetes集群时遇到的问题
  • 定制个性化相亲交友平台的全流程解析
  • JAVA开源项目 员工绩效考核系统 计算机毕业设计
  • 【spring】IDEA 新建一个spring boot 项目
  • CentOS系统上Node.js安装与配置最佳实践
  • 软件测试学习笔记丨Postman实战练习
  • C/C++——野指针处理
  • 提高电子邮件营销点击率的17种强大策略
  • 如何实现滚动到el-table 的底部
  • 【Go】Go语言介绍与开发环境搭建
  • python进阶————上下文管理器跟生成器
  • C语言代码练习(第十九天)
  • STM3学习记录
  • 【集合】1.集合的概念
  • 【Hot100算法刷题集】双指针-01-移动零(含置零思路、移动思路、偏移量思路、冒泡法)
  • k线图中的三条线作用何在?
  • 【QT】VS2020+QT插件 CMake项目开发踩坑记录
  • C++--模板
  • 沟通技巧网课笔记