JAVA TCP协议初体验
文章目录
- 一、需求概述
- 二、设计选择
- 三、代码结构
- 四、代码放送
- 五、本地调试
- 1. 服务端日志
- 2. 客户端日志
- 3. 断线重连日志
- 六、服务器部署运行
- 1. 源码下载
- 2. 打包镜像
- 3. 运行容器
一、需求概述
最近开发某数据采集系统,系统整体的数据流程图如下:
同时,数据中心又需要下发命令到某客户端执行,客户端执行完成后将结果通知到数据中心。
二、设计选择
考虑功能点:
- 客户端多个,一段时间内数量可控相对固定。
- 客户端主动连接服务端,支持断线重连。
- 客户端与服务端支持双向通信。
选择TCP协议作为客户端与数据中心之间的交互协议比较合适,数据中心服务器作为tcp-server开放端口供tcp-client连接。
三、代码结构
四、代码放送
https://gitcode.com/00fly/tcp-show
或者使用下面的备份文件恢复成原始的项目代码
如何恢复,请移步查阅:神奇代码恢复工具
//goto docker\docker-compose.yml
version: '3.7'
services:tcp-server:image: registry.cn-shanghai.aliyuncs.com/00fly/tcp-show-server:0.0.1container_name: tcp-serverdeploy:resources:limits:cpus: '1.0'memory: 64Mreservations:cpus: '0.05'memory: 64Mports:- 8000:8000restart: on-failurelogging:driver: json-fileoptions:max-size: '5m'max-file: '1'tcp-client:image: registry.cn-shanghai.aliyuncs.com/00fly/tcp-show-client:0.0.1container_name: tcp-clientdepends_on:- tcp-serverdeploy:resources:limits:cpus: '1.0'memory: 64Mreservations:cpus: '0.05'memory: 64Mrestart: on-failureenvironment:#- TCP_SERVER=192.168.15.202- TCP_SERVER=tcp-serverlogging:driver: json-fileoptions:max-size: '5m'max-file: '1'
//goto docker\restart-server.sh
#!/bin/bash
docker-compose down tcp-server
sleep 10
docker-compose up -d tcp-server
docker logs -f tcp-server
//goto docker\restart.sh
#!/bin/bash
docker-compose down && docker-compose up -d
sleep 2
docker logs -f network-server
//goto docker\stop.sh
#!/bin/bash
docker-compose down
//goto Dockerfile
#基础镜像
#FROM openjdk:8-jre-alpine
FROM adoptopenjdk/openjdk8-openj9:alpine-slimRUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \echo 'Asia/Shanghai' >/etc/timezone#拷贝发布包
COPY target/*.jar /app.jar#启动脚本
ENTRYPOINT ["java", "-Djava.security.egd=file:/dev/./urandom", "-Xshareclasses", "-Xquickstart", "-jar", "/app.jar"]
//goto pom-client.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.fly</groupId><artifactId>tcp-show</artifactId><version>0.0.1</version><name>tcp-show</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub><java.version>1.8</java.version><skipTests>true</skipTests></properties><dependencies><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.1</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.10</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><scope>provided</scope></dependency></dependencies><build><finalName>${project.artifactId}-client-${project.version}</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.10.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- 方式一:带dependencies运行包 --><plugin><artifactId>maven-assembly-plugin</artifactId><version>3.5.0</version><configuration><!-- 是否添加assemblyId --><appendAssemblyId>false</appendAssemblyId><archive><manifest><mainClass>com.fly.protocol.tcp.run.StartClient</mainClass></manifest></archive><descriptorRefs><!--将所有外部依赖JAR都加入生成的JAR包 --><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><!-- 配置执行器 --><id>make-assembly</id><phase>package</phase><!-- 绑定到package阶段 --><goals><goal>single</goal><!-- 只运行一次 --></goals></execution></executions></plugin><!-- 添加docker-maven插件 --><plugin><groupId>io.fabric8</groupId><artifactId>docker-maven-plugin</artifactId><version>0.40.3</version><executions><execution><phase>package</phase><goals><goal>build</goal><!--<goal>push</goal>--><!--<goal>remove</goal>--></goals></execution></executions><configuration><!-- 连接到带docker环境的linux服务器编译image --><!-- <dockerHost>http://192.168.182.10:2375</dockerHost> --><!-- Docker 推送镜像仓库地址 --><pushRegistry>${docker.hub}</pushRegistry><images><image><name>${docker.hub}/00fly/${project.artifactId}-client:${project.version}</name><build><dockerFileDir>${project.basedir}</dockerFileDir></build></image></images></configuration></plugin></plugins></build>
</project>
//goto pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.fly</groupId><artifactId>tcp-show</artifactId><version>0.0.1</version><name>tcp-show</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub><java.version>1.8</java.version><skipTests>true</skipTests></properties><dependencies><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.1</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.10</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version><scope>provided</scope></dependency></dependencies><build><finalName>${project.artifactId}-server-${project.version}</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.10.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- 方式一:带dependencies运行包 --><plugin><artifactId>maven-assembly-plugin</artifactId><version>3.5.0</version><configuration><!-- 是否添加assemblyId --><appendAssemblyId>false</appendAssemblyId><archive><manifest><mainClass>com.fly.protocol.tcp.run.StartServer</mainClass></manifest></archive><descriptorRefs><!--将所有外部依赖JAR都加入生成的JAR包 --><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><!-- 配置执行器 --><id>make-assembly</id><phase>package</phase><!-- 绑定到package阶段 --><goals><goal>single</goal><!-- 只运行一次 --></goals></execution></executions></plugin><!-- 添加docker-maven插件 --><plugin><groupId>io.fabric8</groupId><artifactId>docker-maven-plugin</artifactId><version>0.40.3</version><executions><execution><phase>package</phase><goals><goal>build</goal><!--<goal>push</goal>--><!--<goal>remove</goal>--></goals></execution></executions><configuration><!-- 连接到带docker环境的linux服务器编译image --><!-- <dockerHost>http://192.168.182.10:2375</dockerHost> --><!-- Docker 推送镜像仓库地址 --><pushRegistry>${docker.hub}</pushRegistry><images><image><name>${docker.hub}/00fly/${project.artifactId}-server:${project.version}</name><build><dockerFileDir>${project.basedir}</dockerFileDir></build></image></images></configuration></plugin></plugins></build>
</project>
//goto src\main\java\com\fly\protocol\tcp\bio\TcpClient.java
package com.fly.protocol.tcp.bio;import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;import lombok.extern.slf4j.Slf4j;@Slf4j
public class TcpClient implements Runnable
{private String ip;private int port;private Socket socket;private DataOutputStream dataOutputStream;private String clientName;private boolean isClientCoreRun = false;private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);private ExecutorService executor = Executors.newFixedThreadPool(2);public TcpClient(String clientName){super();this.clientName = clientName;}/*** * @param ip 服务端IP* @param port 服务端PORT* @return*/public boolean connectServer(String ip, int port){try{this.ip = ip;this.port = port;socket = new Socket(InetAddress.getByName(ip), port);log.info("****** TcpClient will connect to Server {}:{}", ip, port);scheduler.scheduleAtFixedRate(this::checkConnection, 0, 10, TimeUnit.SECONDS);isClientCoreRun = true;dataOutputStream = new DataOutputStream(socket.getOutputStream());dataOutputStream.writeUTF(clientName);dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());isClientCoreRun = false;}return isClientCoreRun;}/*** 检查TCP连接*/private void checkConnection(){if (socket == null || socket.isClosed()){log.error("Connection lost, attempting to reconnect");reconnect();}}private void reconnect(){try{socket = new Socket(InetAddress.getByName(ip), port);log.info("****** TcpClient will connect to Server {}:{}", ip, port);isClientCoreRun = true;executor.execute(new ReceiveMsg());dataOutputStream = new DataOutputStream(socket.getOutputStream());dataOutputStream.writeUTF(clientName);dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());isClientCoreRun = false;}}/*** 发送报文*/public void sendMsg(String msg){try{dataOutputStream.writeUTF(msg);dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());closeClientConnect();}}/*** 断开客户端与服务端的连接*/public void closeClientConnect(){if (dataOutputStream != null){try{dataOutputStream.close();isClientCoreRun = false;if (socket != null){socket.close();}}catch (IOException e){log.error(e.getMessage());}}}@Overridepublic void run(){executor.execute(new ReceiveMsg());// 发送数据scheduler.scheduleAtFixedRate(() -> {sendMsg(RandomStringUtils.randomAlphanumeric(10));}, RandomUtils.nextInt(1, 10), 10, TimeUnit.SECONDS);}class ReceiveMsg implements Runnable{private DataInputStream dataInputStream;public ReceiveMsg(){try{// 数据输入流dataInputStream = new DataInputStream(socket.getInputStream());}catch (IOException e){log.error(e.getMessage());}}@Overridepublic void run(){try{// server停止后, 会影响接受消息线程工作while (isClientCoreRun){String msg = dataInputStream.readUTF();log.info("{} get msg: {}", clientName, msg);}}catch (IOException e){log.error(e.getMessage());// 防止重连失败closeClientConnect();}}}
}
//goto src\main\java\com\fly\protocol\tcp\bio\TcpServer.java
package com.fly.protocol.tcp.bio;import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;import lombok.extern.slf4j.Slf4j;@Slf4j
public class TcpServer implements Runnable
{private ServerSocket serverSocket;private boolean isServerCoreRun = false;private Map<String, NewClient> allClient = new HashMap<>();public boolean startServer(String ip, int port){try{serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(ip, port));isServerCoreRun = true;}catch (IOException e){log.error(e.getMessage());isServerCoreRun = false;}return isServerCoreRun;}/*** 关闭服务** #1 断开与所有客户端的连接,并将客户端容器中的所有已连接的客户端清空。 #2 关闭服务器套接字*/public void closeServer(){try{isServerCoreRun = false;for (Map.Entry<String, NewClient> all : this.allClient.entrySet()){all.getValue().isNewClientRun = false;all.getValue().socket.close();}allClient.clear();serverSocket.close();}catch (IOException e){log.error(e.getMessage());}}/*** 向客户端发送报文*/public void sendMsg(String clientName, String msg){if (allClient.containsKey(clientName)){allClient.get(clientName).sendMsg(msg);}}@Overridepublic void run(){try{log.info("TcpServer will start");while (isServerCoreRun){// 阻塞式等待客户端连接Socket socket = serverSocket.accept();String clientName = new DataInputStream(socket.getInputStream()).readUTF();String clientIP = socket.getInetAddress().getHostAddress();int clientPort = socket.getPort();String clientConnectDateTime = DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");NewClient newClient = new NewClient(socket, clientName, clientIP, clientPort, clientConnectDateTime);allClient.put(clientName, newClient);log.info("**** add new client ===> {}", allClient.keySet());new Thread(newClient).start();}}catch (IOException e){log.error(e.getMessage());}}class NewClient implements Runnable{// 客户端套接字private Socket socket;// 数据输入流private DataInputStream dataInputStream;// 数据输出流private DataOutputStream dataOutputStream;// 客户端运行(收、发报文)状态private boolean isNewClientRun = true;// 客户端的名称private String clientName;// 客户端的IP地址private String clientIP;public NewClient(){}// 构造方法初始化成员属性public NewClient(Socket socket, String clientName, String clientIP, int clientPort, String clientConnectDateTime){this.socket = socket;this.clientName = clientName;this.clientIP = clientIP;try{// 创建客户端数据输入、输出流dataInputStream = new DataInputStream(socket.getInputStream());dataOutputStream = new DataOutputStream(socket.getOutputStream());}catch (IOException e){log.error(e.getMessage());closeCurrentClient();}}@Overridepublic void run(){try{// 客户端在运行才能收发报文while (this.isNewClientRun){// 获取到客户端发送的报文String msg = dataInputStream.readUTF();if (StringUtils.isNotBlank(msg)){log.info("clientName: {}, clientIP: {}, send msg ===> {}", clientName, clientIP, msg);}// 向客户端传送数据int index = 0;for (String key : allClient.keySet()){index++;if (StringUtils.equals(key, clientName)){allClient.get(key).sendMsg("from server: " + msg + StringUtils.repeat("-----", index));}}}}catch (IOException e){log.error(e.getMessage());closeCurrentClient();}}/*** 断开当前客户端的连接释放资源*/public void closeCurrentClient(){try{// 结束客户端的运行状态isNewClientRun = false;// 断开数据输出出流if (dataOutputStream != null){dataOutputStream.close();}// 断开数据输入出流if (dataInputStream != null){dataInputStream.close();}// 断开客户端套解析if (socket != null){socket.close();}// 将该客户端从客户端容器中删除allClient.remove(clientName);log.info("**** remove client ===> {}", allClient.keySet());}catch (IOException e){log.error(e.getMessage());}}/*** 发送报文*/public void sendMsg(String msg){try{// 发送报文dataOutputStream.writeUTF(msg);// 清空报文缓存dataOutputStream.flush();}catch (IOException e){log.error(e.getMessage());closeCurrentClient();}}}
}
//goto src\main\java\com\fly\protocol\tcp\run\StartClient.java
package com.fly.protocol.tcp.run;import java.util.stream.IntStream;import org.apache.commons.lang3.StringUtils;import com.fly.protocol.tcp.bio.TcpClient;public class StartClient
{public static void main(String[] args){// docker环境下优先使用docker-compose中environment值String serverIp = StringUtils.defaultIfBlank(System.getenv().get("TCP_SERVER"), "127.0.0.1");IntStream.rangeClosed(1, 3).forEach(i -> {TcpClient client = new TcpClient("CLIENT_" + i);if (client.connectServer(serverIp, 8000)){new Thread(client).start();}});}
}
//goto src\main\java\com\fly\protocol\tcp\run\StartServer.java
package com.fly.protocol.tcp.run;import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;import com.fly.protocol.tcp.bio.TcpServer;public class StartServer
{public static void main(String[] args){TcpServer server = new TcpServer();if (server.startServer("0.0.0.0", 8000)){Executors.newScheduledThreadPool(2).scheduleAtFixedRate(() -> {int index = RandomUtils.nextInt(1, 4);server.sendMsg("CLIENT_" + index, "random: " + RandomStringUtils.randomAlphanumeric(10));}, 10, 60, TimeUnit.SECONDS);new Thread(server).start();}}
}
//goto src\main\resources\log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="off" monitorInterval="0"><appenders><console name="Console" target="system_out"><patternLayoutpattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n" /></console></appenders><loggers><root level="INFO"><appender-ref ref="Console" /></root></loggers>
</configuration>
五、本地调试
先后启动StartServer、StartClient
1. 服务端日志
2. 客户端日志
3. 断线重连日志
六、服务器部署运行
1. 源码下载
在安装好jdk、maven、docker环境的服务器下载源码
git clone https://gitcode.com/00fly/tcp-show.git
2. 打包镜像
#server打包
mvn clean package#client打包
mvn clean package -f pom-client.xml
3. 运行容器
上传docker文件目录到服务器,执行
sh restart.shsh restart-server.shdocker logs -f tcp-server
docker logs -f tcp-client
有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!
-over-