pyppeteer通用模板
pyppeteer通用模板

pyppeteer通用模板


import asyncio
import time
from loguru import logger
logger.add("out.log", rotation="100 MB", backtrace=True, diagnose=True)

from subprocess import call
from multiprocessing import Process
from fake_useragent import UserAgent
from pyppeteer import launch
ua = UserAgent()
class Spider:
    def __init__(self):
        self.url = "https://search.shopping.naver.com/catalog/19374887080?query=ADATA%20HD330%20USB3.1&NaPm=ct%3Dl4i53xi8%7Cci%3D7610a9621d487f37cd7a8e1cf287613ee02f1b25%7Ctr%3Dslsl%7Csn%3D95694%7Chk%3D45f91bfab8555ace9e5d7236982a0e7fffc115b0"
        self.headers = {
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "accept-encoding": "gzip, deflate, br",
    "accept-language": "zh-CN,zh;q=0.9",
    "cache-control": "no-cache",
    "pragma": "no-cache",
    "sec-fetch-dest": "document",
    "sec-fetch-mode": "navigate",
    "sec-fetch-site": "none",
    "sec-fetch-user": "?1",
    "upgrade-insecure-requests": "1",
    "user-agent": ua.random
}

    async def get_html(self, i, num):
        browser = await launch(
            # headless=False,
            headless=True,
            dumpio=True,
            # dumpio=False,
            autoClose=True, devtools=False,
            userDataDir=r"D:\temporary",
            args=['--disable-infobars',
                  # '--no-sandbox',
                  '--proxy-server=u6213.20.tn.16yun.cn:****',
                  '--disable-setuid-sandbox',
                  '--window-size=1366,850',

                  ]
        )
        page = await browser.newPage()
        # page = await browser.pages()
        await page.setViewport({'width': 1366, 'height': 768})
        await page.authenticate({
            'username': '******',
            'password': '******'
        })

        await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36')
        try:
            await page.goto(self.url, {'timeout': 30 * 1000})
            page_title = await page.title()
            if "ADATA HD330 USB3.1" in page_title:
                # await asyncio.sleep(2)
                # await page.evaluate('window.scrollTo(0,document.body.800)')

                # await page.evaluate('window.scrollTo(0,document.body.800)')
                # selector = '#__next > div > div.style_container__3iYev > div.style_inner__1Eo2z > div.style_content_wrap__2VTVx > div.style_content__36DCX > div > div.summary_info_area__3XT5U > div.condition_area > table > tbody > tr:nth-child(6) > td.productByMall_mall_area__1oEU_ > div > a'
                # 点击特定元素
                selector = '#__next > div > div.style_container__3iYev > div.style_inner__1Eo2z > div.style_content_wrap__2VTVx > div.style_content__36DCX > div > div.summary_info_area__3XT5U > div.condition_area > table > tbody > tr:nth-child(4) > td.productByMall_mall_area__1oEU_ > div > a'
                await page.click(selector)
                await asyncio.sleep(2)
                # print("进程{}:刷新{}次".format(num, i))
                await page.evaluate('window.scrollTo(0,200)')
                await page.evaluate('window.scrollTo(200,400)')
                await page.evaluate('window.scrollTo(400,800)')
                logger.info("进程{}:刷新{}次".format(num, i))
                time.sleep(2)

                await browser.close()

        except Exception as e:
            logger.error(e)
        finally:
            await browser.close()
    def run(self, num):
        i = 1
        while True:
            try:
                asyncio.get_event_loop().run_until_complete(self.get_html(i, num))
            except Exception as e:
                logger.error(e)
            i += 1

if __name__ == '__main__':
    # Spider().run(2)
    for num in range(5):
        p1 = Process(target=Spider().run, args=(str(num),))
        p1.start()

发表回复

您的电子邮箱地址不会被公开。