import asyncio
import time
from loguru import logger
logger.add("out.log", rotation="100 MB", backtrace=True, diagnose=True)
from subprocess import call
from multiprocessing import Process
from fake_useragent import UserAgent
from pyppeteer import launch
ua = UserAgent()
class Spider:
def __init__(self):
self.url = "https://search.shopping.naver.com/catalog/19374887080?query=ADATA%20HD330%20USB3.1&NaPm=ct%3Dl4i53xi8%7Cci%3D7610a9621d487f37cd7a8e1cf287613ee02f1b25%7Ctr%3Dslsl%7Csn%3D95694%7Chk%3D45f91bfab8555ace9e5d7236982a0e7fffc115b0"
self.headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"accept-encoding": "gzip, deflate, br",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"pragma": "no-cache",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": ua.random
}
async def get_html(self, i, num):
browser = await launch(
# headless=False,
headless=True,
dumpio=True,
# dumpio=False,
autoClose=True, devtools=False,
userDataDir=r"D:\temporary",
args=['--disable-infobars',
# '--no-sandbox',
'--proxy-server=u6213.20.tn.16yun.cn:****',
'--disable-setuid-sandbox',
'--window-size=1366,850',
]
)
page = await browser.newPage()
# page = await browser.pages()
await page.setViewport({'width': 1366, 'height': 768})
await page.authenticate({
'username': '******',
'password': '******'
})
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36')
try:
await page.goto(self.url, {'timeout': 30 * 1000})
page_title = await page.title()
if "ADATA HD330 USB3.1" in page_title:
# await asyncio.sleep(2)
# await page.evaluate('window.scrollTo(0,document.body.800)')
# await page.evaluate('window.scrollTo(0,document.body.800)')
# selector = '#__next > div > div.style_container__3iYev > div.style_inner__1Eo2z > div.style_content_wrap__2VTVx > div.style_content__36DCX > div > div.summary_info_area__3XT5U > div.condition_area > table > tbody > tr:nth-child(6) > td.productByMall_mall_area__1oEU_ > div > a'
# 点击特定元素
selector = '#__next > div > div.style_container__3iYev > div.style_inner__1Eo2z > div.style_content_wrap__2VTVx > div.style_content__36DCX > div > div.summary_info_area__3XT5U > div.condition_area > table > tbody > tr:nth-child(4) > td.productByMall_mall_area__1oEU_ > div > a'
await page.click(selector)
await asyncio.sleep(2)
# print("进程{}:刷新{}次".format(num, i))
await page.evaluate('window.scrollTo(0,200)')
await page.evaluate('window.scrollTo(200,400)')
await page.evaluate('window.scrollTo(400,800)')
logger.info("进程{}:刷新{}次".format(num, i))
time.sleep(2)
await browser.close()
except Exception as e:
logger.error(e)
finally:
await browser.close()
def run(self, num):
i = 1
while True:
try:
asyncio.get_event_loop().run_until_complete(self.get_html(i, num))
except Exception as e:
logger.error(e)
i += 1
if __name__ == '__main__':
# Spider().run(2)
for num in range(5):
p1 = Process(target=Spider().run, args=(str(num),))
p1.start()