当前位置: 首页 > news >正文

python_每天定时向数据库插入数据

每天的零点十分,定时向mysql数据库插入,昨天新增的文件和昨天下载文件的记录。第一次运行的时候,会全量同步昨天之前的数据。

import os
import threading
from datetime import datetime, timedelta
import time
import schedule
from pymysql import Connectionlocalhost = 'localhost'
user = 'root'
password = 'root'
db = 'book'# 筛选所有非文件夹的文件
def dir_file(folder_path):filess = []for root, dirs, files in os.walk(folder_path):for file in files:filess.append(os.path.join(root, file))return filess# ******上传文件的处理******# 根据文件的创建日期,提前文件名、创建时间、路径等信息
def yesterday_files_count(files):# 获取每个文件的详细信息file_details = []for file in files:# 剔除掉tmt后缀的文件if 'tmt' not in file:# 文件夹路径,去掉前面路径,F:\24年路测数据\04-PBOX一型4G传输数据采集\parent_path = os.path.dirname(file).replace('\\', '/')[10:]# 文件名file_name = os.path.basename(file)# 设备名称device_name = file_name[:4]# 文件修改时间# m_time = os.path.getmtime(file_path),# 文件创建时间c_time = os.path.getctime(file)# 转换时间格式c_datetime = datetime.fromtimestamp(c_time)c_date = c_datetime.strftime('%Y-%m-%d')date_c = datetime.strptime(c_date, "%Y-%m-%d")# 获取昨天的日期yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")date_y = datetime.strptime(yesterday, "%Y-%m-%d")if date_c == date_y:# 追加到列表file_details.append({'device_name': device_name,'file_name': file_name,'parent_path': parent_path,'create_time': c_date,})# 打印文件名称和创建时间# for file_dict in file_details:#     print(#         f"('{file_dict['device_name']}','{file_dict['file_name']}','{file_dict['parent_path']}','{file_dict['create_time']}'),")current_time = datetime.now()print(f"{current_time}:完成创建时间是昨天的文件筛选")return file_detailsdef before_yesterday_files_count(files):# 获取每个文件的详细信息file_details = []for file in files:# 剔除掉tmt后缀的文件if 'tmt' not in file:# 文件夹路径,去掉前面路径,F:\24年路测数据\04-PBOX一型4G传输数据采集\parent_path = os.path.dirname(file).replace('\\', '/')[10:]# 文件名file_name = os.path.basename(file)# 设备名称device_name = file_name[:4]# 文件修改时间# m_time = os.path.getmtime(file_path),# 文件创建时间c_time = os.path.getctime(file)# 转换时间格式c_datetime = datetime.fromtimestamp(c_time)c_date = c_datetime.strftime('%Y-%m-%d')date_c = datetime.strptime(c_date, "%Y-%m-%d")# 获取昨天的日期yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")date_y = datetime.strptime(yesterday, "%Y-%m-%d")if date_c < date_y:# 追加到列表file_details.append({'device_name': device_name,'file_name': file_name,'parent_path': parent_path,'create_time': c_date,})# 打印文件名称和创建时间# for file_dict in file_details:#     print(#         f"('{file_dict['device_name']}','{file_dict['file_name']}','{file_dict['parent_path']}','{file_dict['create_time']}'),")current_time = datetime.now()print(f"{current_time}:完成创建时间在昨天之前的文件筛选")return file_details# ******日志文件的下载记录处理******# 拼接昨天的文件
def yesterday():# 获取昨天的日期yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")# 设置日志文件的模式,假设是按日期格式命名的,例如: log_2023-03-10.txtlog_pattern = f"fzs-{yesterday}.log"return log_pattern# 获取昨天之前的文件
def before_yesterday(file):# 截取日志文件名的日期部分part = file[-14:-4]date_p = datetime.strptime(part, "%Y-%m-%d")# 获取昨天的日期yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")date_y = datetime.strptime(yesterday, "%Y-%m-%d")if date_p < date_y:return Trueelse:return False# 搜索日志中的下载人员,时间,文件名
def log_download(log_file):# 设置关键字,匹配相邻的行,打开通道->建立连接->成功传输keyword_d = "150 Opening data channel for file download from server of"keyword_t = "TLS connection for data connection established"keyword_s = "226 Successfully transferred"download = []if os.path.exists(log_file):with open(log_file, 'r', encoding="utf-8", errors="ignore") as file:lines = file.readlines()for index, line in enumerate(lines):# 匹配相近的三行关键词if index <= len(lines) - 2 and keyword_d in line and keyword_t in lines[index + 1] and keyword_s in \lines[index + 2]:# 空格分隔,去掉末尾的换行dr = line.split(' ')[1].strip('\n').replace('/', '-')de = line.split(' ')[4].strip('\n')fn = line.split(' ')[-1].strip('\n')# 对文件路径进行裁剪,"/04-PBOX一型4G传输数据采集/RefData.24/Month.Aug/T_AQ_Incoming_2350.org"fnn = fn.strip('"').split('/')[-1]if 'tmt' not in fnn:download.append({'download_date': dr,'downloader': de,'file_name': fnn,})# for dl in download:#     print(#         f"('{dl['file_name']}','{dl['downloader']}','{dl['download_date']}'),")return download# 处理昨天的日志文件
def yesterday_job(log_path):log_pattern = yesterday()log_file = os.path.join(log_path, log_pattern)ld = []if os.path.exists(log_file):ld = log_download(log_file)sql_log(ld)current_time = datetime.now()print(f"{current_time}:完成昨天的日志文件筛选")# return ld# 处理昨天之前的日志文件
def before_yesterday_job(log_path):# 列出文件夹下所有文件和文件夹# files_and_dirs = os.listdir(log_path)# 过滤出文件,排除文件夹# files = [f for f in files_and_dirs if os.path.isfile(os.path.join(log_path, f))]ld = []files = dir_file(log_path)for file in files:if before_yesterday(file):ld = log_download(file)sql_log(ld)current_time = datetime.now()print(f"{current_time}:完成昨天之前的日志文件筛选")# return ld# ******数据库插入操作******# 新增文件的插入sql
def sql_file(sql_sta):con = Nonetry:# 创建数据库连接con = Connection(host=localhost,  # 主机名port=3306,  # 端口user=user,  # 账户password=password,  # 密码database=db,  # 指定操作的数据库autocommit=True  # 设置自动提交)# 获取游标对象cursor = con.cursor()# 使用游标对象,执行sql语句for sta in sql_sta:sql_ins = ("insert into record_ftp (device_name, file_name, file_path, create_time) values (" + "'" +sta['device_name'] + "'" + ", " + "'" + sta['file_name'] + "'" + "," + "'" + sta['parent_path'] + "'" + ", " + "'" + sta['create_time'] + "'" + ")")# print(sql_ins)cursor.execute(sql_ins)# 获取主键# print("主键id=", con.insert_id())# 确认提交# con.commit()except Exception as e:print("异常:", e)finally:if con:# 关闭连接con.close()# 日志信息的插入sql
def sql_log(sql_sta):con = Nonetry:# 创建数据库连接con = Connection(host=localhost,  # 主机名port=3306,  # 端口user=user,  # 账户password=password,  # 密码database=db,  # 指定操作的数据库autocommit=True  # 设置自动提交)# 获取游标对象cursor = con.cursor()# 使用游标对象,执行sql语句for sta in sql_sta:sql_ins = ("insert into record_download_details (file_name, downloader, download_time) values (" + "'" +sta['file_name'] + "'" + ", " + "'" + sta['downloader'] + "'" + "," + "'" + sta['download_date'] + "'" + ")")# print(sql_ins)cursor.execute(sql_ins)# 获取主键# print("主键id=", con.insert_id())# 确认提交# con.commit()except Exception as e:print("异常:", e)finally:if con:# 关闭连接con.close()# 处理昨天文件的job
def job_yesterday_file(folder_path):if os.path.exists(folder_path):# 对文件进行处理files = dir_file(folder_path)yfc = yesterday_files_count(files)sql_file(yfc)else:# 文件不存在current_time = datetime.now()print(f"{current_time}:该文件夹不存在", log_path)# 处理昨天之前文件的job
def job_before_yesterday_file(folder_path):if os.path.exists(folder_path):# 对文件进行处理files = dir_file(folder_path)byfc = before_yesterday_files_count(files)sql_file(byfc)else:# 文件不存在current_time = datetime.now()print(f"{current_time}:该文件夹不存在", log_path)# 处理昨天的日志文件的job
def job_yesterday_log(log_path):if os.path.exists(log_path):# 对文件进行处理yesterday_job(log_path)# 调用before_yesterday_job方法,仅返回最后一个日志文件的下载记录# sql_log(yj)else:# 文件不存在current_time = datetime.now()print(f"{current_time}:该文件夹不存在", log_path)# 处理昨天之前的日志文件的job
def job_before_yesterday_log(log_path):if os.path.exists(log_path):# 对文件进行处理before_yesterday_job(log_path)# 调用before_yesterday_job方法,仅返回最后一个日志文件的下载记录# sql_log(byj)else:# 文件不存在current_time = datetime.now()print(f"{current_time}:该文件夹不存在", log_path)# 多线程运行
def run_threading(job_func, path):# 多线程并行运行job_thread = threading.Thread(target=job_func, args=(path,))job_thread.start()if __name__ == '__main__':# 文件处理folder_path = os.getcwd()# folder_path = r'E:\DPI\WDWG'# 日志处理# log_path = os.getcwd()# log_path = r'E:\data\ftp的日志\Logs'log_path = r'F:\FileZilla\FileZilla Serve 中文版\Logs'run_time = "00:10"current_time = datetime.now()print(f"{current_time}:每天" + run_time + "执行任务,统计昨天新增的文件和昨天下载的日志信息")# 创建调度器schedule_daily_file = schedule.Scheduler()schedule_daily_log = schedule.Scheduler()# 每天定时调度任务schedule_daily_file.every().day.at(run_time).do(run_threading, job_yesterday_file, folder_path)current_time = datetime.now()print(f"{current_time}:完成昨日新增文件的统计")schedule_daily_log.every().day.at(run_time).do(run_threading, job_yesterday_log, log_path)current_time = datetime.now()print(f"{current_time}:完成昨日日志下载的统计")# 初始化时,调用一次job_before_yesterday_file(folder_path)job_before_yesterday_log(log_path)# 立即执行所有任务schedule_daily_file.run_all()schedule_daily_log.run_all()while True:schedule_daily_file.run_pending()schedule_daily_log.run_pending()time.sleep(1)

运行的日志:

数据库:昨日新增文件的表

下载记录表:


http://www.mrgr.cn/news/11016.html

相关文章:

  • Kubernetes存储入门
  • 【创作活动】你是如何克服编程学习中的挫折感的
  • 第四节:Nodify 连接端子手动连接
  • 代码随想录算法训练营day53:图04:104.建造最大岛屿;110. 字符串接龙;105.有向图的完全可达性
  • Mac外接4K显示器 字体大小适应 设置HIDPI
  • 开源低代码LLM编排平台Dify:可视化Agent和工作流,如何部署在自己系统中,自定义修改前后端详解
  • PHP在现代Web开发中的高效应用与实战案例
  • SpringMVC - 第一个 SpringMVC 程序
  • OpenCV+Python自动填涂机读卡
  • OpenCV绘图函数(2)绘制圆形函数circle()的使用
  • 用Python插入SVG到PDF文档
  • 数学建模学习(118):牛顿冷却定律的原理解析、案例分析与Python求解
  • 【HuggingFace Transformers】BertIntermediate 和 BertPooler源码解析
  • 沈阳网站建设手机能看的网站
  • 0基础学习Python路径(29)collections模块
  • ubuntu系统在线安装下载firefox-esr流览器
  • WIN11环境下,如何在指定目录快速启动jupyter lab或jupyter notebook
  • MongoDB适用场景
  • 空气净化器怎么选能除猫毛?宠物空气净化器除味好的分享
  • GLM-4-Flash 大模型API免费了,手把手构建“儿童绘本”应用实战(附源码)