pandas解析excel文件
pandas解析excel文件

pandas解析excel文件

# -*- coding: UTF-8 -*-
"""=========================================================
@Project -> File: ReportAnalysis -> AnalysisMain
@IDE: PyCharm
@author: lxc
@date: 2023-10-24 下午 3:02
@Desc:
1-功能描述:
中车项目支持
将excel文件路径添加到一个列表中,再遍历该列表,依次处理表格内容
2-实现步骤
    1- 读取文件根路径,目的是添加文件路径列表:
        如果碰到压缩包,则解压该文件,再处理解压后的路径,递归该方法;
        如果碰到excel文件(不论是xls还是xlsx),则添加进文件路径列表;
        如果碰到路径,则再递归该方法
        这样一来便得到了该路径下的所有excel文件
    2- 处理excel文件,按指定需求将表格内容提取成需求字段对应的字典
    3- 将item元数据入库
3-包说明:
    1- 测试数据库使用了peewee连接sqlite
    2- 使用了pandas处理excel表格,读取xls和xlsx分别依靠openpyxl、xlrd引擎
"""
import json
import re
import os
import zipfile
import pandas as pd
import math
import logging
import datetime
# from utils.model import ZhongChe

logger = logging.getLogger()

def industry_mapping(company_name):
    """
    公司-产业关系
    :return: industry_name
    """
    mapping = {
        "中车山东机车车辆有限公司(合并)": "轨道交通装备产业(货车)",
        "中车山东机车车辆有限公司(差额)": "轨道交通装备产业(货车)",
        "中车山东机车车辆有限公司(个体)": "轨道交通装备产业(货车)",
        "中车山东机车车辆有限公司": "轨道交通装备产业(货车)",
        "中车山东风电有限公司(合并)": "风电产业",
        "中车山东风电有限公司(差额)": "风电产业",
        "中车山东风电有限公司(个体)": "风电产业",
        "中车山东风电有限公司": "风电产业",
        "吉林中车风电装备有限公司": "风电产业",
        "山东中车同力钢构有限公司": "风电产业",
        "河北中车风电装备有限公司": "风电产业",
        "山东中车同力达智能装备有限公司": "智能装备产业",
        "江苏中车华腾环保科技有限公司": "环保产业",
        "常熟中车水务有限公司": "环保产业",
        "丹棱中车水务有限公司": "环保产业",
        "常熟中车村镇水务有限公司": "环保产业",
        "常熟中车环保水务有限公司": "环保产业",
        "中车风电(锡林郭勒)有限公司": "风电产业"
    }
    return mapping.get(company_name, '')

def processing_bracket(string):
    """
    处理字符串中的括号内容,不论中英文括号
    返回去除括号内容的新字符串,以及括号内容的集合
    :param string:
    :return:
    """
    # 处理△符号/'其中:'
    string = string.replace("△", '').replace("其中:", '')
    pattern = r'[((].+?[))]'
    matches = re.findall(pattern, string)
    remarks = ''.join(matches)
    for matche in matches:
        string = string.replace(matche, '')
    return string, remarks

class AnalysisExcel:
    def __init__(self, file_root):
        # self.file_root = config.FILES_PATH
        self.excel_file_list = []
        self.file_root = file_root
        self.output_list = []
        self.id = 1
    def analysis_excel(self, path):
        suffix = path.split('.')[-1]
        if suffix.lower() == 'xlsx':
            df = pd.read_excel(path, engine="openpyxl", sheet_name=0)
        else:
            df = pd.read_excel(path, engine="xlrd", sheet_name=0)
        total_raws_num = df.shape[0]
        try:
            row_number = [index for index, row in df.iterrows() if '累计' in str(row.values)][0]
        except ValueError:
            raise ValueError("首行读取错误。未找到'累计'出现的首行")
        # 获取公司名称
        companys = df.iloc[row_number - 1, 0]
        if ':' in companys:
            company_name = companys.split(':')[-1]
        elif ':' in companys:
            company_name = companys.split(':')[-1]
        else:
            company_name = companys
        # 获取年月字段
        dates = df.iloc[row_number - 1, 2]
        pattern = r'(\d{4})年(\d{1,2})月'

        def add_zero(match):
            # 自定义替换函数来确保月份为两位数字
            return f"{match.group(1)}-{match.group(2).zfill(2)}"
        years_days = re.sub(pattern, add_zero, dates)
        # 获取其他字段
        report_type = ''
        first_category_name = ''
        second_category_name = ''
        for raw_num in range(row_number + 1, total_raws_num):
            try:
                item = {}
                # item = {'id': ""}
                # 公司名称
                item['公司'] = company_name.strip()
                # 获取产业名称
                item['产业'] = industry_mapping(item['公司'])
                # 年月
                item['年月'] = years_days
                # 按"上年同期"是否有值作结束标准
                try:
                    if math.isnan(df.iloc[raw_num, 4]) and math.isnan(df.iloc[raw_num, 1]):
                        break
                except:
                    pass
                first_column = df.iloc[raw_num, 0].strip()  # 第一列字段内容
                # 报表类型处理
                if [keyword for keyword in ['一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、'] if
                    keyword in first_column]:
                    report_type = first_column.split('、')[-1].strip()
                    continue
                # 一级类目名称
                elif [keyword for keyword in ['0.', '1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.'] if
                      keyword in first_column]:
                    first_category_name = first_column.split('.')[-1].strip()
                    first_category_name, remarks = processing_bracket(first_category_name)

                else:
                    # 二级类目名称
                    second_category_name = first_column.strip()
                    # 处理括号内容
                    second_category_name, remarks = processing_bracket(second_category_name)
                # id
                # item['id'] = str(self.id)
                # 类目名称
                item['报表类型'] = report_type
                item['一级类目名称'] = first_category_name
                item['二级类目名称'] = second_category_name
                item['类目名称'] = second_category_name if second_category_name else first_category_name
                # 备注
                item['备注'] = remarks
                # 本月数
                number_of_current_month = df.iloc[raw_num, 2]
                item['本月数'] = "%.2f" % 0 if "—" in str(
                    number_of_current_month) else "%.2f" % number_of_current_month
                # 本年累计
                current_year_cumulative = df.iloc[raw_num, 3]
                item['本年累计'] = "%.2f" % 0 if "—" in str(
                    current_year_cumulative) else "%.2f" % current_year_cumulative
                # 上年同期
                the_same_period_of_last_year = df.iloc[raw_num, 4]
                item['上年同期'] = "%.2f" % 0 if "—" in str(
                    the_same_period_of_last_year) else "%.2f" % the_same_period_of_last_year
                # 更新时间
                item['更新时间'] = str(datetime.datetime.now())[:19]
                # print(json.dumps(item, indent=4, ensure_ascii=False))
                # 存储数据入库
                self.output_list.append(item)
                # self.id += 1
            except Exception as e:
                logger.error(e)

    # def to_database(self, item):
    #     ZhongChe.create(**item)

    def get_file_list(self, path):
        """
        获取excel文件列表
        :param path:
        :return:
        """
        files = os.listdir(path)
        for file in files:
            # 拼接文件的完整路径
            file_path = os.path.join(path, file)
            # 判断文件类型
            if os.path.isfile(file_path):  # 是否是文件
                logger.info(file_path)
                suffix = file_path.split('.')[-1]
                if suffix == 'zip':
                    logger.info("当前文件为压缩件,进行解压操作...")
                    zip_file_path = file_path
                    target_path = ''.join(file_path.split('.')[:-1])
                    # 打开ZIP文件
                    zip_file = zipfile.ZipFile(zip_file_path)
                    # 解压所有文件到目标路径,并指定文件名编码
                    for file_info in zip_file.infolist():
                        file_info.filename = file_info.filename.encode('cp437').decode('gbk', errors='ignore')
                        zip_file.extract(file_info, target_path)
                    # 关闭ZIP文件
                    zip_file.close()
                    # 再次遍历该路径
                    self.get_file_list(target_path)
                elif suffix.lower() == 'xlsx' or suffix.lower() == 'xls':
                    self.excel_file_list.append(file_path)
                else:
                    ...
                    logger.error(FileNotFoundError("!!文件类型错误!!文件非zip或xlsx/xls格式!"))
            else:
                self.get_file_list(file_path)

    def main(self):
        """
        读取文件路径
        :return:
        """
        # 获取文件列表
        self.get_file_list(self.file_root)
        # self.excel_file_list = list(set(self.excel_file_list))
        print(json.dumps(self.excel_file_list, ensure_ascii=False, indent=4))
        # 解析文件
        for excel_file_path in self.excel_file_list:
            self.analysis_excel(excel_file_path)
        return self.output_list

if __name__ == '__main__':
    # file_path = r'/home/rhino/zhongche_data/'
    file_path = r"D:\workspace\驱动及脚本\山东中车项目支持\新建文件夹"
    output_list = AnalysisExcel(file_root=file_path).main()
    # key_names = ['公司'.encode("GBK").decode("GBK"), 'industry', 'years_days', 'report_type', 'first_category_name', 'second_category_name',
    #  'category_name', 'number_of_current_month', 'current_year_cumulative', 'the_same_period_of_last_year', 'remarks',
    #  'update_time']
    # pd.set_option('display.unicode.east_asian_width', False)
    key_names = ['公司', '产业', '年月', '报表类型', '一级类目名称', '二级类目名称', '类目名称', '本月数', '本年累计', '上年同期', '备注', '更新时间']
    output_dataframe = pd.DataFrame(output_list, columns=key_names)
    # output_dataframe.to_excel()
    print(output_dataframe)

一条评论

回复 666 取消回复

您的电子邮箱地址不会被公开。