"""
@author:***
@date:2022-6-29 10:55:22
@function:
/ftp/..
"""
import base64
import io
import os
import shutil
import pymysql
from PIL import Image
from loguru import logger
import traceback
from openpyxl import Workbook
import psycopg2
import psycopg2.extras
from config import *
# 设置日志文件大小
logger.add("ftp_upload_utils.log", rotation="100 MB", backtrace=True, diagnose=True)
def compress_image_bs4(b64, mb=23, k=0.9):
"""不改变图片尺寸压缩到指定大小
:param outfile: 压缩文件保存地址
:param mb: 压缩目标,KB
:param step: 每次调整的压缩比率
:param quality: 初始压缩比率
:return: 压缩文件地址,压缩文件大小
"""
f = base64.b64decode(b64)
with io.BytesIO(f) as im:
o_size = len(im.getvalue()) // 1024
if o_size <= mb:
return b64
im_out = im
while o_size > mb:
img = Image.open(im_out)
x, y = img.size
out = img.resize((int(x * k), int(y * k)), Image.ANTIALIAS)
im_out.close()
im_out = io.BytesIO()
if out.mode == "RGBA":
out.save(im_out, 'png')
elif out.mode == "RGB":
out.save(im_out, 'jpg')
else:
return ""
o_size = len(im_out.getvalue()) // 1024
b64 = base64.b64encode(im_out.getvalue())
im_out.close()
return str(b64, encoding='utf8')
class DataBaseCon:
"""
数据库操作
"""
def __init__(self, table_name, base_info):
self.table_name = table_name
self.base_info = base_info
def connectMysql(self):
"""
Mysql数据库连接
"""
for _ in range(10):
try:
conn = pymysql.connect(**self.base_info)
# 返回字典格式的数据
cursor = conn.cursor(pymysql.cursors.DictCursor)
return conn, cursor
except Exception as err:
logger.exception("数据库连接失败, %s" % err)
return None, None
def connectPostgreSQL(self):
"""
PG库连接
"""
for _ in range(10):
try:
conn = psycopg2.connect(**self.base_info)
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return conn, cursor
except Exception as err:
logger.exception("数据库连接失败, %s" % err)
return None, None
class Upload:
def __init__(self, table_name, base_info, base_type):
self.table_name = table_name
self.base_info = base_info
self.out_path = "/ftp/industry_chain_data/"
# self.out_path = "./"
self.base_type = base_type
def get_data(self):
"""
获取数据
:param table_name: 表名
:param base_info: 数据库信息
:return:
"""
fields = ""
all_data = ""
for _ in range(10):
try:
if self.base_type == "mysql":
conn, cursor = DataBaseCon(self.table_name, self.base_info).connectMysql()
elif self.base_type == "pgsql":
conn, cursor = DataBaseCon(self.table_name, self.base_info).connectPostgreSQL()
else:
raise TypeError("当前数据库类型无法解析")
sql = 'select * from %s order by id' % self.table_name
cursor.execute(sql)
fields = [field[0] for field in cursor.description]
all_data = cursor.fetchall()
if all_data:
break
except Exception as err:
logger.error('traceback.format_exc():\n%s' % traceback.format_exc())
logger.error(err)
finally:
try:
cursor.close()
conn.close()
except:
pass
return fields, all_data
def get_file_name(self):
if self.base_type == "mysql":
file_name = self.table_name
elif self.base_type == "pgsql":
file_name = self.table_name.split('.')[1]
else:
file_name = ''
return file_name
def to_excel(self):
"""
将数据导出到excel
:return:
"""
logger.info("{}:正在读取数据,准备导出Excel...".format(self.table_name))
fields, all_data = self.get_data()
# 实例化
wb = Workbook()
# 创建表(sheet)
sheet = wb.worksheets[0]
# 写入标题
row = 1
for col, field in enumerate(fields):
sheet.cell(row=row, column=col+1, value=field)
# 写入数据
length = len(all_data)
for data in all_data:
for col, field in enumerate(data):
try:
# 大于excel字符长度的base64格式logo,进行剪裁压缩
cell_message = data[field]
if len(cell_message) > 32767:
cell_message = compress_image_bs4(b64=cell_message)
sheet.cell(row+1, col+1, cell_message)
except Exception as e:
sheet.cell(row + 1, col + 1, '')
logger.error(e)
logger.info("表{0}".format(self.table_name)+"导出进度:%.2f" % (row*100/length) + "%")
row += 1
# 保存数据
file_name = self.get_file_name()
path = self.out_path+file_name
logger.info("正在保存至{} ...".format(path))
# 如果存在目录则删除,并创建新文件夹
if os.path.exists(path):
shutil.rmtree(path,True)
os.mkdir(path)
# 将数据保存至相应文件夹中
wb.save("%s.xlsx" % (path+"/"+file_name))
logger.info("{}:保存完毕".format(file_name))
def to_txt(self):
"""
将数据导出到.txt文件
:return:
"""
logger.info("{}:正在读取数据,准备导出TXT文本...".format(self.table_name))
fields, all_data = self.get_data()
file_name = self.get_file_name()
path = self.out_path + file_name
# 如果存在目录则删除,并创建新文件夹
if os.path.exists(path):
shutil.rmtree(path, True)
os.mkdir(path)
fp = open("%s.txt" % (path+"/"+file_name), "wb+")
# 写入标题
for col, field in enumerate(fields):
line = '\t'.join([str(val) for val in field]) + "\n" # 格式:A\tB\tC\n
fp.write(line.encode('utf8')) # 以utf8写入
# 写入数据
length = len(all_data)
for row, data in enumerate(all_data):
# pass
line = '\t'.join([str(val) for val in list(data.values())]) + "\n" # 格式:A\tB\tC\n
fp.write(line.encode('utf8') ) #以utf8写入
logger.info("表{0}".format(self.table_name) + "导出进度:%.2f" % (row * 100 / length) + "%")
fp.close()
if __name__ == '__main__':
# Upload(table_name="51job_jiangsu_all", base_info=MY_SQL_LOCAL_LXC, base_type="mysql").to_excel()
# Upload(table_name="school_base_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
# Upload(table_name="school_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
# Upload(table_name="school_intro_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
# Upload(table_name="school_professional_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
# Upload(table_name="school_professional_intro_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_excel()
# Upload(table_name="industry_chain.industry_info", base_info=PG_SQL_LOCAL, base_type="pgsql").to_excel()
# Upload(table_name="industry_chain.industry_info", base_info=PG_SQL_LOCAL, base_type="pgsql").to_txt()
# Upload(table_name="school_professional_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_txt()
Upload(table_name="school_professional_intro_detail_info", base_info=MY_SQL_LOCAL_SCHOOL, base_type="mysql").to_txt()