如何使用client-go构建pod web shell

news/2024/5/21 3:10:46

代码示例及原理

  • 原理是利用websocket协议实现对pod的exec登录,利用client-go构造与远程apiserver的长连接,将对pod容器的输入和pod容器的输出重定向到我们的io方法中,从而实现浏览器端的虚拟终端的效果
  • 消息体结构如下
type Connection struct {WsSocket    *websocket.Conn // 主websocket连接OutWsSocket *websocket.ConnInChan      chan *WsMessage // 输入消息管道OutChan     chan *WsMessage // 输出消息管道Mutex     sync.Mutex // 并发控制IsClosed  bool // 是否关闭CloseChan chan byte // 关闭连接管道
}
// 消息体
type WsMessage struct {MessageType int    `json:"messageType"`Data        []byte `json:"data"`
}
// terminal的行宽和列宽
type XtermMessage struct {Rows uint16 `json:"rows"`Cols uint16 `json:"cols"`
}
  • 下面需要一个handler来控制终端和接收消息,ResizeEvent用来控制终端变更的事件
type ContainerStreamHandler struct {WsConn      *ConnectionResizeEvent chan remotecommand.TerminalSize
}
  • 为了控制终端,我们需要重写TerminalSize的Next方法,这个方法是client-go定义的接口,如下所示
/*
Copyright 2017 The Kubernetes Authors.Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/package remotecommand// TerminalSize and TerminalSizeQueue was a part of k8s.io/kubernetes/pkg/util/term
// and were moved in order to decouple client from other term dependencies// TerminalSize represents the width and height of a terminal.
type TerminalSize struct {Width  uint16Height uint16
}// TerminalSizeQueue is capable of returning terminal resize events as they occur.
type TerminalSizeQueue interface {// Next returns the new terminal size after the terminal has been resized. It returns nil when// monitoring has been stopped.Next() *TerminalSize
}
  • 我们重写的Next方法如下
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {select {case ret := <-handler.ResizeEvent:size = &retcase <-handler.WsConn.CloseChan:return nil // 这里很重要, 具体见最后的解释}return
}
  • 当我们从ResizeEvent管道中接收到调整终端大小的事件之后,这个事件会被client-go接收到,源码如下
func (p *streamProtocolV3) handleResizes() {if p.resizeStream == nil || p.TerminalSizeQueue == nil {return}go func() {defer runtime.HandleCrash()encoder := json.NewEncoder(p.resizeStream)for {size := p.TerminalSizeQueue.Next() // 接收到我们的调整终端大小的事件if size == nil {return}if err := encoder.Encode(&size); err != nil {runtime.HandleError(err)}}}()
}
  • 最后我们给出核心的实现(只是一个大概的框架,具体细节有问题可以留言)
func ExecCommandInContainer(ctx context.Context, conn *tty.Connection, podName, namespace, containerName string) (err error) {kubeClient, err := k8s.CreateClientFromConfig([]byte(env.Kubeconfig))if err != nil {return}restConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(env.Kubeconfig))if err != nil {return}// 构造请求req := kubeClient.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(namespace).SubResource("exec").VersionedParams(&corev1.PodExecOptions{Command:   []string{"/bin/sh", "-c", "export LANG=\"en_US.UTF-8\"; [ -x /bin/bash ] && exec /bin/bash || exec /bin/sh"},Container: containerName,Stdin:     true,Stdout:    true,Stderr:    true,TTY:       true,}, scheme.ParameterCodec)// 使用spdy协议对http协议进行增量升级 exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())if err != nil {return err}handler := &tty.ContainerStreamHandler{WsConn:      conn,ResizeEvent: make(chan remotecommand.TerminalSize),}// 核心函数,重定向标准输入和输出err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{Stdin:             handler,Stdout:            handler,Stderr:            handler,TerminalSizeQueue: handler,Tty:               true,})return
}
  • 整个函数的核心是StreamWithContext函数,这个函数是client-go的一个方法,接下来我们详细分析一下
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects or the context is done.
func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {conn, streamer, err := e.newConnectionAndStream(ctx, options)if err != nil {return err}defer conn.Close()panicChan := make(chan any, 1) // panic管道errorChan := make(chan error, 1) // error管道go func() {defer func() {if p := recover(); p != nil {panicChan <- p}}()errorChan <- streamer.stream(conn)}()select {case p := <-panicChan:panic(p)case err := <-errorChan:return errcase <-ctx.Done():return ctx.Err()}
}
  • 这个方法首先初始化了一个连接,然后定义了两个管道,分别接收panic事件和error事件,核心是streamer.stream()方法,接下来我们分析一下这个方法
func (p *streamProtocolV4) stream(conn streamCreator) error {// 创建一个与apiserver的连接传输流if err := p.createStreams(conn); err != nil {return err}// now that all the streams have been created, proceed with reading & copying// 观察流中的错误errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})// 监听终端调整的事件p.handleResizes()// 将我们的标准输入拷贝到remoteStdin也就是远端的标准输入当中p.copyStdin()var wg sync.WaitGroup// 将远端的标准输出拷贝到我们的标准输出当中p.copyStdout(&wg)p.copyStderr(&wg)// we're waiting for stdout/stderr to finish copyingwg.Wait()// waits for errorStream to finish reading with an error or nilreturn <-errorChan
}
  • 整体逻辑还是很清晰的,具体实现细节看源码吧

OOM 问题

  • 这个功能上线之后,发现内存不断攀升,如下图(出现陡降是因为我重启了服务)
    在这里插入图片描述
  • 使用pprof进行问题排查,在你的main.go文件中加入下面的内容
import _ "net/http/pprof"func main(){go func() {http.ListenAndServe("localhost:6060", nil)}()
}
  • 然后你可以在http://localhost:6060/debug/pprof/中看到下面的页面
    在这里插入图片描述
  • 点击full goroutine stack dump,你会看到goroutine的堆栈存储情况,如下图
    在这里插入图片描述
  • 查看之后发现出现了很多的残留goroutine,出现在Next方法中,如下图
    在这里插入图片描述
  • 这个问题出现的原因是我们重写的Next方法,当断开连接的时候必须要主动返回一个nil,否则会残留一个go func,具体看上面的handleResizes方法中有一个for循环,必须收到一个sizenil,才能跳出此func,一开始我写的方法如下,这样写的话当浏览器退出的时候,是不会给我一个nil的终端调整的事件的
// 错误写法
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {ret := <-handler.ResizeEvent:size = &retreturn
}
// 正确写法
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {select {case ret := <-handler.ResizeEvent:size = &retcase <-handler.WsConn.CloseChan:return nil // 当发现管道关闭的时候,主动返回一个nil}return
}

有问题欢迎交流


http://www.mrgr.cn/p/52141520

相关文章

【c++算法篇】双指针(下)

&#x1f525;个人主页&#xff1a;Quitecoder &#x1f525;专栏&#xff1a;算法笔记仓 朋友们大家好啊&#xff0c;本篇文章我们来到算法的双指针的第二部分 目录 1.有效三角形的个数2.查找总价格为目标值的两个商品3.三数之和4.四数之和5.双指针常见场景总结 1.有效三角形…

移动机器人系统与技术:自动驾驶、移动机器人、旋翼无人机

这本书全面介绍了机器人车辆的技术。它介绍了道路上自动驾驶汽车所需的概念。此外&#xff0c;读者可以在六足机器人的构造、编程和控制方面获得宝贵的知识。 这本书还介绍了几种不同类型旋翼无人机的控制器和空气动力学。它包括各种旋翼推进飞行器在不同空气动力学环境下的模…

ThreeJS:本地部署官网文档与案例

部署方式 部署之前请确保已经配置好node.js环境。 1. 下载ThreeJS源码 ThreeJS的GitHub地址&#xff1a;GitHub - mrdoob/three.js: JavaScript 3D Library.&#xff0c;可以简单查看ThreeJS当前版本&#xff1a;r164&#xff0c; 我们可以选择对应的版本&#xff08;此处为r1…

10秒以上无错误!猫态量子比特稳定性达到新水平

内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 文丨 浪味仙 排版丨沛贤 深度好文&#xff1a;1200字丨5分钟阅读 摘要&#xff1a;与涉及超导电路的其他量子比特设计相比&#xff0c;使用猫态量子比特可能会“将用于纠错的量子比特数量减少到…

SpringBoot整合Mybatis时mapper文件和xml文件的位置

xml文件放在resources下看下我的项目目录2.由于放在resurces下就无法扫描到xml文件,所以就需要在配置文件配置--mapper文件位置 mybatis.mapper-locations=classpath:mapper/*.xml 或 mybatis.mapper-locations=classpath:/mapper/*.xmlxml和mapper文件放在一起我的项目目录但…

Flume进阶

目录 第1关&#xff1a;拦截器的使用 第2关&#xff1a;自定义拦截器 第1关&#xff1a;拦截器的使用 代码文件&#xff1a; # Define source, channel, sink #agent名称为a1# Define source #source类型配置为avro,监听8888端口&#xff0c;后台会自动发送数据到该端口 #拦截后…

论文阅读:《Sequence can Secretly Tell You What to Discard》,减少推理阶段的 kv cache

目前各类大模型都支持长文本&#xff0c;例如 kimi chat 以及 gemini pro&#xff0c;都支持 100K 以及更高的上下文长度。但越长的上下文&#xff0c;在推理过程中需要存储的 kv cache 也越多。假设&#xff0c;数据的批次用 b 表示&#xff0c;输入序列的长度仍然用 s 表示&a…

聚观早报 | 苹果新款iPad Pro发布;国产特斯拉4月交付量

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 5月9日消息 苹果新款iPad Pro发布 国产特斯拉4月交付量 iOS 18新功能爆料 真我GT Neo6续航细节 三星Galaxy Z F…

#Scurm冲刺第五天

Scurm冲刺第五天 1. 站立式会议内容昨日已完成任务 今日计划完成任务前端UI设计代码编写(收藏页面,商品详情页,个人中心页) 前端UI设计代码编写(购物车页面,订单页面,订单详情页,搜索后商品展示页),前端界面合理跳转功能实现后端管理员模块功能实现(登录注册功能,用户…

m1_day7

课程内容:数组的排序引用数据类型的数组面向对象封装继承多态数组的排序:手动排序 冒泡排序 *自动排序Arrays.sort(数组对象);只能升序排序import java.util.*;引用数据类型的数组:当我们创建一个引用数据类型的数组的时候 其实里面一个对象都没有 里面都是默认值null 为了防…

数据分析:基于sparcc的co-occurrence网络

介绍 Sparcc是基于16s或metagenomics数据等计算组成数据之间关联关系的算法。通常使用count matrix数据。 安装Sparcc软件 git clone gitgithub.com:JCSzamosi/SparCC3.git export PATH/path/SparCC3:$PATHwhich SparCC.py导入数据 注&#xff1a;使用rarefy抽平的count ma…

『先进技术助力』Kompas AI:智能AI代理在工作中的应用与效率提升

『智能化未来』Kompas AI如何改变我们的工作方式&#xff1f; 在这个信息时代&#xff0c;利用AI聊天机器人来处理机械性的工作已经成为一种趋势。ChatGPT作为一种智能助手&#xff0c;不仅能够提高工作效率&#xff0c;还可以帮助我们更明智地做出决策&#xff0c;从而释放出更…

【视频】多元线性回归模型原理讲解与R语言实例

原文链接:https://tecdat.cn/?p=36149 原文出处:拓端数据部落公众号 分析师:Xue Yang 近年来,随着计量经济学和统计学的快速发展,回归模型作为一种有效的数据分析工具,被广泛应用于金融市场的分析中。回归模型能够通过建立变量之间的数学关系,揭示变量之间的相互作用机…

Python随机波动性SV模型:贝叶斯推断马尔可夫链蒙特卡洛MCMC分析英镑/美元汇率时间序列数据

全文链接:https://tecdat.cn/?p=33885 原文出处:拓端数据部落公众号 本文描述了帮助客户使用马尔可夫链蒙特卡洛(MCMC)方法通过贝叶斯方法估计基本的单变量随机波动模型,就像Kim等人(1998年)所做的那样。 定义模型以及从条件后验中抽取样本的函数的代码也在Python脚本中…

Hbase 常用shell操作

目录 1、创建表 1.1、启动HBase Shell 1.2、创建表 1.3、查看表 1.4、删除表 2、插入数据 2.1、put命令 3、查看数据 3.1、get命令 3.2、查询数据中文显示 4、更新数据 4.1、使用put来更新数据 5、删除数据 5.1、delete命令 5.2、删除指定列的数据 5.3、delete…

vs2019 里 C++ 20规范的 string 类的源码注释

&#xff08;1&#xff09;读源码&#xff0c;可以让我们更好的使用这个类&#xff0c;掌握这个类&#xff0c;知道咱们使用了库代码以后&#xff0c;程序大致具体是怎么执行的。而不用担心程序出不知名的意外的问题。也便于随后的代码调试。 string 类实际是 库中 basic_strin…

c语言题库之序列合并

文章目录 前言C语言题目&#xff1a;分析1. 合并逻辑2.图解合并逻辑 代码实现注意事项总结思考 前言 在编程中&#xff0c;我们经常遇到需要将两个有序序列合并为一个有序序列的问题。下面&#xff0c;我们就来详细探讨一下如何解决这个问题&#xff0c;包括输入处理、合并逻辑…

团队作业4--项目冲刺 第4篇 Scrum 冲刺博客

这个作业属于哪个课程 软件工程这个作业要求在哪里 团队作业4——项目冲刺这个作业的目标 团队完成任务的分配,明确团队每个人在接下来七天敏捷冲刺的目标其他参考文献这个作业所属团队 SuperNewCode团队成员 张楠 曾琳备 黄铭涛 张小宇 周广1.每日举行站立时会议2.燃尽图3.每…

详细介绍ARM-ORACLE Database 19c数据库下载

目录 1. 前言 2. 获取方式 2.1 ORACLE专栏 2.2 ORACLE下载站点 1. 前言 现有网络上已有非常多关于ORACLE数据库机下载的介绍&#xff0c;但对于ARM平台的介绍不多&#xff0c;借此机会我将该版的下载步骤做如下说明&#xff0c;希望能够一些不明之人提供帮助和参考 2. 获…

网络管理实验四、SNMP协议分析

1 实验概括 实验目的 捕获SNMP报文&#xff0c;通过报文分析进一步理解SNMP的报文结构、MIB-2树的结构、理解管理信息结构SMI及其规定的ASN.1。 实验内容 1、自行挑选两个网管对象&#xff0c;分别使用get&#xff0c;get-next取其值。 2、使用抓包软件抓取数据包。 3、分析并…