

Python可乐 167



由于爬虫工作往往有大量数据需爬取,便需要大量的备用IP更换,这时就需要用到代理IP池。将大量可以用于更换的代理IP汇聚要一起,便于管理和调用,IP池就这样产生了。IP池有一下特征:它里面的IP是持续补充的,会有源源不断的新的IP被加入到池子中。它里面的IP是有生命周期的,一但失效就会被清除出 IP池;它里面的IP是可以被任意取出,方便爬虫用户使用的。








class BaseCrawler(object):    urls = []    # new_loop = asyncio.new_event_loop()    # asyncio.set_event_loop(new_loop)    LOOP = asyncio.get_event_loop()    asyncio.set_event_loop(LOOP)    @retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None)    async def _get_page(self, url):        async with aiohttp.ClientSession() as session:            try:                async with session.get(                        url,  timeout=10                ) as resp:                    # print(dir(resp.content),resp.content)                    return await resp.text()            except:                return ''    def crawl(self):        """        crawl main method        """        for url in self.urls:            app_logger.info(f'fetching {url}')            # html = self.LOOP.run_until_complete(asyncio.gather(self._get_page(url)))            html = self.LOOP.run_until_complete(self._get_page(url))            # print('html', html)            for proxy in self.parse(html):                # app_logger.info(f'fetched proxy {proxy.string()} from {url}')                yield proxy




class Tester(object):    """    tester for testing proxies in queue    """    def __init__(self):        self.redis = RedisClient()        self.loop = asyncio.get_event_loop()    async def test(self, proxy):        """        test single proxy        :param proxy:        :return:        """        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:            try:                app_logger.debug(f'testing {proxy}')                async with session.get(TEST_URL, proxy=f'http://{proxy}', timeout=TEST_TIMEOUT,                                       allow_redirects=False) as response:                    if response.status in TEST_VALID_STATUS:                        self.redis.set_max(proxy)                        app_logger.debug(f'proxy {proxy} is valid, set max score')                    else:                        self.redis.decrease(proxy)                        app_logger.debug(f'proxy {proxy} is invalid, decrease score')            except EXCEPTIONS:                self.redis.decrease(proxy)                app_logger.debug(f'proxy {proxy} is invalid, decrease score')    def run(self):        """        test main method        :return:        """        # event loop of aiohttp        app_logger.info('stating tester...')        count = self.redis.count()        app_logger.debug(f'{count} proxies to test')        for i in range(0, count, TEST_BATCH):            # start end end offset            start, end = i, min(i + TEST_BATCH, count)            app_logger.debug(f'testing proxies from {start} to {end} indices')            proxies = self.redis.batch(start, end)            tasks = [self.test(proxy) for proxy in proxies]            # run tasks using event loop            self.loop.run_until_complete(asyncio.wait(tasks))


__all__ = ['app']app = Flask(__name__)def get_conn():    """    get redis client object    :return:    """    if not hasattr(g, 'redis'):        g.redis = RedisClient()    return g.redis@app.route('/')def index():    """    get home page, you can define your own templates    :return:    """    return '<h2>Welcome to Proxy Pool System</h2>'@app.route('/random')def get_proxy():    """    get a random proxy    :return: get a random proxy    """    conn = get_conn()    return conn.random()@app.route('/count')def get_count():    """    get the count of proxies    :return: count, int    """    conn = get_conn()    return str(conn.count())


class Scheduler(object):    """Scheduler"""    def run_tester(self, cycle=CYCLE_TESTER):        """        定时启动检测器        :param cycle:        :return:        """        if not ENABLE_TESTER:            app_logger.info('tester not enabled, exit')            return        tester = Tester()        sch.add_job(tester.run, 'interval', seconds=cycle, id='run_tester')        sch.start()    def run_getter(self, cycle=CYCLE_GETTER):        """        定时开启爬虫补充代理        :param cycle:        :return:        """        if not ENABLE_GETTER:            app_logger.info('getter not enabled, exit')            return        getter = Getproxies()        sch.add_job(getter.run, 'interval', seconds=cycle, id='run_getter')        sch.start()    def run_server(self):        """web服务端开启"""        if not ENABLE_SERVER:            app_logger.info('server not enabled, exit')            return        app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)    def run(self):        """调度器启动"""        global tester_process, getter_process, server_process        try:            app_logger.info('starting proxypool...')            if ENABLE_TESTER:                tester_process = multiprocessing.Process(target=self.run_tester)                app_logger.info(f'starting tester, pid {tester_process.pid}...')                tester_process.start()            if ENABLE_GETTER:                getter_process = multiprocessing.Process(target=self.run_getter)                # print(dir(getter_process),getter_process)                app_logger.info(f'starting getter, pid{getter_process.pid}...')                getter_process.start()            if ENABLE_SERVER:                server_process = multiprocessing.Process(target=self.run_server)                app_logger.info(f'starting server, pid{server_process.pid}...')                server_process.start()            tester_process.join()            getter_process.join()            server_process.join()        except KeyboardInterrupt:            app_logger.info('received keyboard interrupt signal')            tester_process.terminate()            getter_process.terminate()            server_process.terminate()        finally:            # must call join method before calling is_alive            tester_process.join()            getter_process.join()            server_process.join()            app_logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')            app_logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')            app_logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')            app_logger.info('proxy terminated')

标签: #python实现http代理