python 数据库表导出excel、txt
python 数据库表导出excel、txt

python 数据库表导出excel、txt

"""
@author:***
@date:2022-6-29 10:55:22
@function:

/ftp/..
"""
import base64
import io
import os
import shutil
import pymysql
from PIL import Image
from loguru import logger
import traceback
from openpyxl import Workbook
import psycopg2
import psycopg2.extras
from config import *
# 设置日志文件大小
logger.add("ftp_upload_utils.log", rotation="100 MB", backtrace=True, diagnose=True)

def compress_image_bs4(b64, mb=23, k=0.9):
    """不改变图片尺寸压缩到指定大小
    :param outfile: 压缩文件保存地址
    :param mb: 压缩目标,KB
    :param step: 每次调整的压缩比率
    :param quality: 初始压缩比率
    :return: 压缩文件地址,压缩文件大小
    """
    f = base64.b64decode(b64)
    with io.BytesIO(f) as im:
        o_size = len(im.getvalue()) // 1024
        if o_size <= mb:
            return b64
        im_out = im
        while o_size > mb:
            img = Image.open(im_out)
            x, y = img.size
            out = img.resize((int(x * k), int(y * k)), Image.ANTIALIAS)
            im_out.close()
            im_out = io.BytesIO()
            if out.mode == "RGBA":
                out.save(im_out, 'png')
            elif out.mode == "RGB":
                out.save(im_out, 'jpg')
            else:
                return ""
            o_size = len(im_out.getvalue()) // 1024
        b64 = base64.b64encode(im_out.getvalue())
        im_out.close()
        return str(b64, encoding='utf8')

class DataBaseCon:
    """
    数据库操作
    """
    def __init__(self, table_name, base_info):
        self.table_name = table_name
        self.base_info = base_info
    def connectMysql(self):
        """
        Mysql数据库连接
        """
        for _ in range(10):
            try:

                conn = pymysql.connect(**self.base_info)
                # 返回字典格式的数据
                cursor = conn.cursor(pymysql.cursors.DictCursor)
                return conn, cursor
            except Exception as err:
                logger.exception("数据库连接失败,  %s" % err)
                return None, None

    def connectPostgreSQL(self):
        """
        PG库连接
        """
        for _ in range(10):
            try:
                conn = psycopg2.connect(**self.base_info)
                cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
                return conn, cursor
            except Exception as err:
                logger.exception("数据库连接失败,  %s" % err)
        return None, None

class Upload:
    def __init__(self, table_name, base_info, base_type):
        self.table_name = table_name
        self.base_info = base_info
        self.out_path = "/ftp/industry_chain_data/"
        # self.out_path = "./"
        self.base_type = base_type
    def get_data(self):
        """
        获取数据
        :param table_name: 表名
        :param base_info: 数据库信息
        :return:
        """
        fields = ""
        all_data = ""
        for _ in range(10):
            try:
                if self.base_type == "mysql":
                    conn, cursor = DataBaseCon(self.table_name, self.base_info).connectMysql()
                elif self.base_type == "pgsql":
                    conn, cursor = DataBaseCon(self.table_name, self.base_info).connectPostgreSQL()
                else:
                    raise TypeError("当前数据库类型无法解析")
                sql = 'select * from %s order by id' % self.table_name
                cursor.execute(sql)
                fields = [field[0] for field in cursor.description]
                all_data = cursor.fetchall()
                if all_data:
                    break
            except Exception as err:
                logger.error('traceback.format_exc():\n%s' % traceback.format_exc())
                logger.error(err)
            finally:
                try:
                    cursor.close()
                    conn.close()
                except:
                    pass
        return fields, all_data
    def get_file_name(self):
        if self.base_type == "mysql":
            file_name = self.table_name
        elif self.base_type == "pgsql":
            file_name = self.table_name.split('.')[1]
        else:
            file_name = ''
        return file_name
    def to_excel(self):
        """
        将数据导出到excel
        :return:
        """
        logger.info("{}:正在读取数据,准备导出Excel...".format(self.table_name))
        fields, all_data = self.get_data()
        # 实例化
        wb = Workbook()
        # 创建表(sheet)
        sheet = wb.worksheets[0]

        # 写入标题
        row = 1
        for col, field in enumerate(fields):
            sheet.cell(row=row, column=col+1, value=field)

        # 写入数据
        length = len(all_data)
        for data in all_data:
            for col, field in enumerate(data):
                try:
                    # 大于excel字符长度的base64格式logo,进行剪裁压缩
                    cell_message = data[field]
                    if len(cell_message) > 32767:
                        cell_message = compress_image_bs4(b64=cell_message)
                    sheet.cell(row+1, col+1, cell_message)
                except Exception as e:
                    sheet.cell(row + 1, col + 1, '')
                    logger.error(e)
            logger.info("表{0}".format(self.table_name)+"导出进度:%.2f" % (row*100/length) + "%")
            row += 1
        # 保存数据
        file_name = self.get_file_name()
        path = self.out_path+file_name
        logger.info("正在保存至{} ...".format(path))
        # 如果存在目录则删除,并创建新文件夹
        if os.path.exists(path):
            shutil.rmtree(path,True)
        os.mkdir(path)
        # 将数据保存至相应文件夹中
        wb.save("%s.xlsx" % (path+"/"+file_name))
        logger.info("{}:保存完毕".format(file_name))

    def to_txt(self):
        """
        将数据导出到.txt文件
        :return:
        """
        logger.info("{}:正在读取数据,准备导出TXT文本...".format(self.table_name))
        fields, all_data = self.get_data()
        file_name = self.get_file_name()
        path = self.out_path + file_name
        # 如果存在目录则删除,并创建新文件夹
        if os.path.exists(path):
            shutil.rmtree(path, True)
        os.mkdir(path)
        fp = open("%s.txt" % (path+"/"+file_name), "wb+")
        # 写入标题
        for col, field in enumerate(fields):
            line = '\t'.join([str(val) for val in field]) + "\n"  # 格式:A\tB\tC\n
            fp.write(line.encode('utf8'))  # 以utf8写入
        # 写入数据
        length = len(all_data)
        for row, data in enumerate(all_data):
            # pass
            line = '\t'.join([str(val) for val in list(data.values())]) + "\n" # 格式:A\tB\tC\n
            fp.write(line.encode('utf8') ) #以utf8写入
            logger.info("表{0}".format(self.table_name) + "导出进度:%.2f" % (row * 100 / length) + "%")
        fp.close()

if __name__ == '__main__':
    # Upload(table_name="51job_jiangsu_all", base_info=MY_SQL_LOCAL_LXC, base_type="mysql").to_excel()
    # Upload(table_name="school_base_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
    # Upload(table_name="school_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
    # Upload(table_name="school_intro_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
    # Upload(table_name="school_professional_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
    # Upload(table_name="school_professional_intro_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
    # Upload(table_name="industry_chain.industry_info", base_info=PG_SQL_LOCAL, base_type="pgsql").to_excel()
    # Upload(table_name="industry_chain.industry_info", base_info=PG_SQL_LOCAL, base_type="pgsql").to_txt()
    # Upload(table_name="school_professional_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_txt()
    Upload(table_name="school_professional_intro_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_txt()

发表回复

您的电子邮箱地址不会被公开。