# -*- coding: UTF-8 -*-
"""=========================================================
@Project -> File: ReportAnalysis -> AnalysisMain
@IDE: PyCharm
@author: lxc
@date: 2023-10-24 下午 3:02
@Desc:
1-功能描述:
中车项目支持
将excel文件路径添加到一个列表中,再遍历该列表,依次处理表格内容
2-实现步骤
1- 读取文件根路径,目的是添加文件路径列表:
如果碰到压缩包,则解压该文件,再处理解压后的路径,递归该方法;
如果碰到excel文件(不论是xls还是xlsx),则添加进文件路径列表;
如果碰到路径,则再递归该方法
这样一来便得到了该路径下的所有excel文件
2- 处理excel文件,按指定需求将表格内容提取成需求字段对应的字典
3- 将item元数据入库
3-包说明:
1- 测试数据库使用了peewee连接sqlite
2- 使用了pandas处理excel表格,读取xls和xlsx分别依靠openpyxl、xlrd引擎
"""
import json
import re
import os
import zipfile
import pandas as pd
import math
import logging
import datetime
# from utils.model import ZhongChe
logger = logging.getLogger()
def industry_mapping(company_name):
"""
公司-产业关系
:return: industry_name
"""
mapping = {
"中车山东机车车辆有限公司(合并)": "轨道交通装备产业(货车)",
"中车山东机车车辆有限公司(差额)": "轨道交通装备产业(货车)",
"中车山东机车车辆有限公司(个体)": "轨道交通装备产业(货车)",
"中车山东机车车辆有限公司": "轨道交通装备产业(货车)",
"中车山东风电有限公司(合并)": "风电产业",
"中车山东风电有限公司(差额)": "风电产业",
"中车山东风电有限公司(个体)": "风电产业",
"中车山东风电有限公司": "风电产业",
"吉林中车风电装备有限公司": "风电产业",
"山东中车同力钢构有限公司": "风电产业",
"河北中车风电装备有限公司": "风电产业",
"山东中车同力达智能装备有限公司": "智能装备产业",
"江苏中车华腾环保科技有限公司": "环保产业",
"常熟中车水务有限公司": "环保产业",
"丹棱中车水务有限公司": "环保产业",
"常熟中车村镇水务有限公司": "环保产业",
"常熟中车环保水务有限公司": "环保产业",
"中车风电(锡林郭勒)有限公司": "风电产业"
}
return mapping.get(company_name, '')
def processing_bracket(string):
"""
处理字符串中的括号内容,不论中英文括号
返回去除括号内容的新字符串,以及括号内容的集合
:param string:
:return:
"""
# 处理△符号/'其中:'
string = string.replace("△", '').replace("其中:", '')
pattern = r'[((].+?[))]'
matches = re.findall(pattern, string)
remarks = ''.join(matches)
for matche in matches:
string = string.replace(matche, '')
return string, remarks
class AnalysisExcel:
def __init__(self, file_root):
# self.file_root = config.FILES_PATH
self.excel_file_list = []
self.file_root = file_root
self.output_list = []
self.id = 1
def analysis_excel(self, path):
suffix = path.split('.')[-1]
if suffix.lower() == 'xlsx':
df = pd.read_excel(path, engine="openpyxl", sheet_name=0)
else:
df = pd.read_excel(path, engine="xlrd", sheet_name=0)
total_raws_num = df.shape[0]
try:
row_number = [index for index, row in df.iterrows() if '累计' in str(row.values)][0]
except ValueError:
raise ValueError("首行读取错误。未找到'累计'出现的首行")
# 获取公司名称
companys = df.iloc[row_number - 1, 0]
if ':' in companys:
company_name = companys.split(':')[-1]
elif ':' in companys:
company_name = companys.split(':')[-1]
else:
company_name = companys
# 获取年月字段
dates = df.iloc[row_number - 1, 2]
pattern = r'(\d{4})年(\d{1,2})月'
def add_zero(match):
# 自定义替换函数来确保月份为两位数字
return f"{match.group(1)}-{match.group(2).zfill(2)}"
years_days = re.sub(pattern, add_zero, dates)
# 获取其他字段
report_type = ''
first_category_name = ''
second_category_name = ''
for raw_num in range(row_number + 1, total_raws_num):
try:
item = {}
# item = {'id': ""}
# 公司名称
item['公司'] = company_name.strip()
# 获取产业名称
item['产业'] = industry_mapping(item['公司'])
# 年月
item['年月'] = years_days
# 按"上年同期"是否有值作结束标准
try:
if math.isnan(df.iloc[raw_num, 4]) and math.isnan(df.iloc[raw_num, 1]):
break
except:
pass
first_column = df.iloc[raw_num, 0].strip() # 第一列字段内容
# 报表类型处理
if [keyword for keyword in ['一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、'] if
keyword in first_column]:
report_type = first_column.split('、')[-1].strip()
continue
# 一级类目名称
elif [keyword for keyword in ['0.', '1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.'] if
keyword in first_column]:
first_category_name = first_column.split('.')[-1].strip()
first_category_name, remarks = processing_bracket(first_category_name)
else:
# 二级类目名称
second_category_name = first_column.strip()
# 处理括号内容
second_category_name, remarks = processing_bracket(second_category_name)
# id
# item['id'] = str(self.id)
# 类目名称
item['报表类型'] = report_type
item['一级类目名称'] = first_category_name
item['二级类目名称'] = second_category_name
item['类目名称'] = second_category_name if second_category_name else first_category_name
# 备注
item['备注'] = remarks
# 本月数
number_of_current_month = df.iloc[raw_num, 2]
item['本月数'] = "%.2f" % 0 if "—" in str(
number_of_current_month) else "%.2f" % number_of_current_month
# 本年累计
current_year_cumulative = df.iloc[raw_num, 3]
item['本年累计'] = "%.2f" % 0 if "—" in str(
current_year_cumulative) else "%.2f" % current_year_cumulative
# 上年同期
the_same_period_of_last_year = df.iloc[raw_num, 4]
item['上年同期'] = "%.2f" % 0 if "—" in str(
the_same_period_of_last_year) else "%.2f" % the_same_period_of_last_year
# 更新时间
item['更新时间'] = str(datetime.datetime.now())[:19]
# print(json.dumps(item, indent=4, ensure_ascii=False))
# 存储数据入库
self.output_list.append(item)
# self.id += 1
except Exception as e:
logger.error(e)
# def to_database(self, item):
# ZhongChe.create(**item)
def get_file_list(self, path):
"""
获取excel文件列表
:param path:
:return:
"""
files = os.listdir(path)
for file in files:
# 拼接文件的完整路径
file_path = os.path.join(path, file)
# 判断文件类型
if os.path.isfile(file_path): # 是否是文件
logger.info(file_path)
suffix = file_path.split('.')[-1]
if suffix == 'zip':
logger.info("当前文件为压缩件,进行解压操作...")
zip_file_path = file_path
target_path = ''.join(file_path.split('.')[:-1])
# 打开ZIP文件
zip_file = zipfile.ZipFile(zip_file_path)
# 解压所有文件到目标路径,并指定文件名编码
for file_info in zip_file.infolist():
file_info.filename = file_info.filename.encode('cp437').decode('gbk', errors='ignore')
zip_file.extract(file_info, target_path)
# 关闭ZIP文件
zip_file.close()
# 再次遍历该路径
self.get_file_list(target_path)
elif suffix.lower() == 'xlsx' or suffix.lower() == 'xls':
self.excel_file_list.append(file_path)
else:
...
logger.error(FileNotFoundError("!!文件类型错误!!文件非zip或xlsx/xls格式!"))
else:
self.get_file_list(file_path)
def main(self):
"""
读取文件路径
:return:
"""
# 获取文件列表
self.get_file_list(self.file_root)
# self.excel_file_list = list(set(self.excel_file_list))
print(json.dumps(self.excel_file_list, ensure_ascii=False, indent=4))
# 解析文件
for excel_file_path in self.excel_file_list:
self.analysis_excel(excel_file_path)
return self.output_list
if __name__ == '__main__':
# file_path = r'/home/rhino/zhongche_data/'
file_path = r"D:\workspace\驱动及脚本\山东中车项目支持\新建文件夹"
output_list = AnalysisExcel(file_root=file_path).main()
# key_names = ['公司'.encode("GBK").decode("GBK"), 'industry', 'years_days', 'report_type', 'first_category_name', 'second_category_name',
# 'category_name', 'number_of_current_month', 'current_year_cumulative', 'the_same_period_of_last_year', 'remarks',
# 'update_time']
# pd.set_option('display.unicode.east_asian_width', False)
key_names = ['公司', '产业', '年月', '报表类型', '一级类目名称', '二级类目名称', '类目名称', '本月数', '本年累计', '上年同期', '备注', '更新时间']
output_dataframe = pd.DataFrame(output_list, columns=key_names)
# output_dataframe.to_excel()
print(output_dataframe)
服辣,你可真牛逼