IP代理池的搭建方法

学习目标:

  • 了解 使用代理的原因
  • 掌握 免费代码的采集和测试
  • 熟悉 付费代理的获取流程

一、免费ip的采集和使用

1、使用ip的作用和原因

  • 首先代理ip可以保护用户信息的安全。在如今的大数据互联网时代,每个人上网总会留下一点信息,很有可能被别人利用,而使用代理ip可以完美解决这个问题。高匿名代理ip可以隐藏用户的真实ip地址,保护用户的个人数据和信息安全,提高用户上网的安全性。
  • 其次可以提高访问速度,有时出现过访问网页时出现卡顿的问题,通过代理ip一定程度上可以解决这个问题。通过代理IP访问的一些网站等信息会存留在代理服务器的缓冲区内,假如别人访问过的信息你再访问,则会直接在缓冲区内拉取数据,进一步提高访问速度。遇到对ip检测比较严格的网址也需要进行替换。
  • 代理区分:https://www.zhihu.com/question/442503446/answer/2516935440

2、免费ip网址

  • 可以从以下网址中采集到批量的ip地址

    西刺代理:http://www.xicidaili.com
    
    快代理:https://www.kuaidaili.com
    
    云代理:http://www.ip3366.net
    
    无忧代理:http://www.data5u.com/
    
    360 代理:http://www.swei360.com
    
    66ip 代理:http://www.66ip.cn
    
    ip 海代理:http://www.iphai.com
    
    大象代理:http://www.daxiangdaili.com/
    
    米扑代理:https://proxy.mimvp.com/freesecret.php
    
    站大爷:http://ip.zdaye.com/
    
    讯代理:http://www.xdaili.cn/
    
    蚂蚁代理:http://www.mayidaili.com/free
    
    89免费代理:http://www.89ip.cn/
    
    全网代理:http://www.goubanjia.com/buy/high.html
    
    开心代理:http://www.kxdaili.com/dailiip.html
    
    猿人云:https://www.apeyun.com/
    

3、采集ip数据和测试

  • 我们在网站上采集的免费ip能使用率不足20%,所以我们采集的ip需要先进行测试,测试完之后存放到文本,或者数据库
  • 测试ip的网站
http://httpbin.org/ip
https://www.ipchicken.com/
https://www.ip2location.com/demo
import time
import requests
import re


class DaiLi(object):
    def __init__(self):
        self.base_URL = 'https://www.kuaidaili.com/free/inha/{}/'
        self.test_URL = 'http://httpbin.org/ip'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.35'}

    def get_response(self, url, proxies=None):
        try:
            response = requests.get(url=url, headers=self.headers, proxies=proxies)
            # time.sleep(0.5)
            return response.text
        except Exception as f:
            print('请求有误,错误为:%s' % f)

    def get_url(self, page):
        url = self.base_URL.format(page)
        return url

    def xpath_html(self, html):

        datas = re.findall('"ip": "(.*?)", "last_check_time": ".*?", "port": "(.*?)",', html)
        # print(datas)
        DiZhi = []
        for i in datas:
            dizhi = {'IP': i[0], 'DuanKou': i[1]}
            DiZhi.append(dizhi)
        return DiZhi

    def save_data(self, data):
        IPS = []
        for i in data:
            proxies = {'http': 'http://' + i['IP'] + ':' + i['DuanKou']}
            try:
                print(proxies)
                response = requests.get(url=self.test_URL, headers=self.headers, proxies=proxies, timeout=2)

                if response.status_code == 200:
                    print(response.status_code)
                    print(response.text)
                    IPS.append(proxies)

            except Exception:
                print('超时链接')
        print(IPS)

    def run(self):
        for i in range(1, int(input('输入爬取页码')) + 1):
            url = self.get_url(i)
            html = self.get_response(url)
            data = self.xpath_html(html)
            self.save_data(data)


if __name__ == '__main__':
    dl = DaiLi()
    dl.run()

注意: 免费ip采集存放到ip池之后,每次使用的时候也需要测试也会有过期的情况

二、付费ip的使用

  • 两着的优缺点:
    • 免费:除了免费,没有优点
    • 付费:除了付费,没有缺点

1、使用方法

  • 快代理:https://www.kuaidaili.com/

  • 找到自己喜欢的付费ip代理平台(案列:快代理,不同的代理平台都有对应文档和客服)

    • 在平台注册一个账户

    • 根据自己的项目选择购买的套餐

    • 购买之后点击账户管理 – 》我的订单 --》点击更多 – 》点击提取ip --》点击生成api链接

    • 根据项目需求修改你需要的提取数量,返回的格式和分割符

    • 请求生成的接口就能获取到对应的ip数据

三、实战案列

import threading
import time

import requests
import pymysql
from queue import Queue
from feapder.network.user_agent import get
from lxml import etree
from retrying import retry
from loguru import logger

class ChaXun():
    def __init__(self):
        self.db = pymysql.connect(host="localhost", user="root", password="root", db="spiders16")
        self.cursor = self.db.cursor()
        self.index_url = 'https://qq.ip138.com/train/'
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
        }
        self.ip_url = 'https://dps.kdlapi.com/api/getdps/?secret_id=ouy6zuogcnb6t516855l&signature=ousxp9hrflvxj7vc19kjrxkrmw&num=1&pt=1&format=text&sep=1'
        # 用户名密码认证(私密代理/独享代理)
        self.username = "d2952905742"
        self.password = "cixv0obv"
        self.ip_queue = Queue()
        self.province_url_queue = Queue()
        self.detail_url_queue = Queue()
        self.data_queue = Queue()

    def create_table(self):
        # 使用预处理语句创建表
        sql = '''
                    CREATE TABLE IF NOT EXISTS huoche(
                        id int primary key auto_increment not null,
                        train_number VARCHAR(255) NOT NULL, 
                        Train_type VARCHAR(255) NOT NULL,
                        starting_station VARCHAR(255) NOT NULL,
                        way_station VARCHAR(255) NOT NULL,
                        destination VARCHAR(255) NOT NULL
                        )
                    '''
        try:
            self.cursor.execute(sql)
            print("CREATE TABLE SUCCESS.")
        except Exception as ex:
            print(f"CREATE TABLE FAILED,CASE:{ex}")

    def get_ip(self):
        """
        获取ip数据,将ip数据放进队列
        :return: ip地址
        """
        while True:
            if self.ip_queue.empty():
                response = requests.get(self.ip_url)
                # print(response.text)
                self.ip_queue.put(response.text)
            else:
                continue

    # 最大重试3次,3次全部报错,才会报错
    @retry(stop_max_attempt_number=3)
    def get_data(self, url):
        ip = self.ip_queue.get()
        proxies = {
            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {'user': self.username, 'pwd': self.password, 'proxy': ip},
            "https": "https://%(user)s:%(pwd)s@%(proxy)s/" % {'user': self.username, 'pwd': self.password, 'proxy': ip}
        }

        print(ip)
        self.headers['User-Agent'] = get()
        response = requests.get(url=url, headers=self.headers, timeout=2, proxies=proxies, verify=False)
        # response = requests.get(url=url, headers=self.headers, timeout=2, verify=False)
        response.encoding = 'utf-8'
        if response.status_code == 200:
            self.ip_queue.put(ip)
        else:
            raise ('状态码错误')
        return response

    def get_province_url(self):
        response = self.get_data(self.index_url)
        html = etree.HTML(response.text)
        dd_list = html.xpath('//div[@class="list"]/dl/dd')
        for dd in dd_list[:2]:
            href = 'https://qq.ip138.com' + dd.xpath('./a/@href')[0]
            self.province_url_queue.put(href)

    def detail_url_get(self):
        while True:
            province_url = self.province_url_queue.get()
            try:
                response = self.get_data(province_url)
            except Exception as e:
                logger.error('异常省份网址:' + province_url)
                continue
            html = etree.HTML(response.text)
            li_list = html.xpath('//div[@class="module mod-list"]/div[@class="bd"]/div/div[2]/ul/li')
            for li in li_list:
                item = {}
                item['to_city'] = li.xpath('./a/text()')[0]
                item['to_city_href'] = 'https://qq.ip138.com' + li.xpath('./a/@href')[0]
                self.detail_url_queue.put(item)
            self.province_url_queue.task_done()

    def paras_data(self):
        while True:
            detail_url = self.detail_url_queue.get()

            try:
                response = self.get_data(detail_url['to_city_href'])
            except Exception as e:
                logger.error('异常市区网址:' + detail_url['to_city_href'])
                continue
            html = etree.HTML(response.text)
            tr_list = html.xpath('//div[@class="table-inner"]/table/tbody/tr')
            for tr in tr_list:
                train_number = tr.xpath('./td[1]/a/b/text()')[0]
                Train_type = tr.xpath('./td[2]/text()')[0]
                starting_station = tr.xpath('./td[3]/a/text()')[0]
                way_station = tr.xpath('./td[5]/a/text()')[0]
                destination = tr.xpath('./td[8]/a/text()')[0]
                print(train_number, Train_type, starting_station, way_station, destination)
                self.data_queue.put((train_number, Train_type, starting_station, way_station, destination))
            self.detail_url_queue.task_done()

    def save_data(self):
        while True:
            self.data_list = []
            for i in range(30):
                data = self.data_queue.get()
                self.data_list.append((0,) + data)
                self.data_queue.task_done()

            # SQL 插入语句
            sql = 'INSERT INTO huoche(id, train_number, Train_type, starting_station, way_station, destination) values(%s, %s, %s, %s, %s, %s)'
            # 执行 SQL 语句
            try:

                self.cursor.executemany(sql, self.data_list)
                # 提交到数据库执行
                self.db.commit()
                print('数据插入成功...')
            except Exception as e:
                print(f'数据插入失败: {e}')
                # 如果发生错误就回滚
                self.db.rollback()


    def main(self):
        self.create_table()
        t_list = []
        # 获取ip线程
        t_ip = threading.Thread(target=self.get_ip)
        t_list.append(t_ip)
        # 获取分类线程
        t_info = threading.Thread(target=self.get_province_url)
        t_list.append(t_info)
        # 获取详细商品线程
        for i in range(1):
            t_detail_url = threading.Thread(target=self.detail_url_get)
            t_list.append(t_detail_url)
        # 解析数据线程
        for i in range(1):
            t_paras = threading.Thread(target=self.paras_data)
            t_list.append(t_paras)
        # 保存数据线程
        t_save = threading.Thread(target=self.save_data)
        t_list.append(t_save)

        for t in t_list:
            t.setDaemon(True)
            t.start()

        time.sleep(4)

        for q in [self.province_url_queue, self.detail_url_queue, self.data_queue]:
            q.join()

        print('子线程结束!!!')

        if self.data_list:
            # SQL 插入语句
            sql = 'INSERT INTO huoche(id, train_number, Train_type, starting_station, way_station, destination) values(%s, %s, %s, %s, %s, %s)'
            # 执行 SQL 语句
            try:
                # print(sql, (0, data[0], data[1], data[2], data[3]))
                self.cursor.executemany(sql, self.data_list)
                # 提交到数据库执行
                self.db.commit()
                print('数据插入成功...')
            except Exception as e:
                print(f'数据插入失败: {e}')
                # 如果发生错误就回滚
                self.db.rollback()



if __name__ == '__main__':
    # 文件过大于500M就会重新生成一个文件
    logger.add("runtime_{time}.log", rotation="500 MB")
    hc = ChaXun()
    hc.main()