08-ip代理池搭建
IP代理池的搭建方法
学习目标:
- 了解 使用代理的原因
- 掌握 免费代码的采集和测试
- 熟悉 付费代理的获取流程
一、免费ip的采集和使用
1、使用ip的作用和原因
- 首先代理ip可以保护用户信息的安全。在如今的大数据互联网时代,每个人上网总会留下一点信息,很有可能被别人利用,而使用代理ip可以完美解决这个问题。高匿名代理ip可以隐藏用户的真实ip地址,保护用户的个人数据和信息安全,提高用户上网的安全性。
- 其次可以提高访问速度,有时出现过访问网页时出现卡顿的问题,通过代理ip一定程度上可以解决这个问题。通过代理IP访问的一些网站等信息会存留在代理服务器的缓冲区内,假如别人访问过的信息你再访问,则会直接在缓冲区内拉取数据,进一步提高访问速度。遇到对ip检测比较严格的网址也需要进行替换。
- 代理区分:https://www.zhihu.com/question/442503446/answer/2516935440
2、免费ip网址
-
可以从以下网址中采集到批量的ip地址
西刺代理:http://www.xicidaili.com 快代理:https://www.kuaidaili.com 云代理:http://www.ip3366.net 无忧代理:http://www.data5u.com/ 360 代理:http://www.swei360.com 66ip 代理:http://www.66ip.cn ip 海代理:http://www.iphai.com 大象代理:http://www.daxiangdaili.com/ 米扑代理:https://proxy.mimvp.com/freesecret.php 站大爷:http://ip.zdaye.com/ 讯代理:http://www.xdaili.cn/ 蚂蚁代理:http://www.mayidaili.com/free 89免费代理:http://www.89ip.cn/ 全网代理:http://www.goubanjia.com/buy/high.html 开心代理:http://www.kxdaili.com/dailiip.html 猿人云:https://www.apeyun.com/
3、采集ip数据和测试
- 我们在网站上采集的免费ip能使用率不足20%,所以我们采集的ip需要先进行测试,测试完之后存放到文本,或者数据库
- 测试ip的网站
http://httpbin.org/ip
https://www.ipchicken.com/
https://www.ip2location.com/demo
import time
import requests
import re
class DaiLi(object):
def __init__(self):
self.base_URL = 'https://www.kuaidaili.com/free/inha/{}/'
self.test_URL = 'http://httpbin.org/ip'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.35'}
def get_response(self, url, proxies=None):
try:
response = requests.get(url=url, headers=self.headers, proxies=proxies)
# time.sleep(0.5)
return response.text
except Exception as f:
print('请求有误,错误为:%s' % f)
def get_url(self, page):
url = self.base_URL.format(page)
return url
def xpath_html(self, html):
datas = re.findall('"ip": "(.*?)", "last_check_time": ".*?", "port": "(.*?)",', html)
# print(datas)
DiZhi = []
for i in datas:
dizhi = {'IP': i[0], 'DuanKou': i[1]}
DiZhi.append(dizhi)
return DiZhi
def save_data(self, data):
IPS = []
for i in data:
proxies = {'http': 'http://' + i['IP'] + ':' + i['DuanKou']}
try:
print(proxies)
response = requests.get(url=self.test_URL, headers=self.headers, proxies=proxies, timeout=2)
if response.status_code == 200:
print(response.status_code)
print(response.text)
IPS.append(proxies)
except Exception:
print('超时链接')
print(IPS)
def run(self):
for i in range(1, int(input('输入爬取页码')) + 1):
url = self.get_url(i)
html = self.get_response(url)
data = self.xpath_html(html)
self.save_data(data)
if __name__ == '__main__':
dl = DaiLi()
dl.run()
注意: 免费ip采集存放到ip池之后,每次使用的时候也需要测试也会有过期的情况
二、付费ip的使用
- 两着的优缺点:
- 免费:除了免费,没有优点
- 付费:除了付费,没有缺点
1、使用方法
-
找到自己喜欢的付费ip代理平台(案列:快代理,不同的代理平台都有对应文档和客服)
-
在平台注册一个账户
-
根据自己的项目选择购买的套餐
-
购买之后点击账户管理 – 》我的订单 --》点击更多 – 》点击提取ip --》点击生成api链接
-
根据项目需求修改你需要的提取数量,返回的格式和分割符
- 请求生成的接口就能获取到对应的ip数据
-
三、实战案列
-
采集目标:火车查询
import threading
import time
import requests
import pymysql
from queue import Queue
from feapder.network.user_agent import get
from lxml import etree
from retrying import retry
from loguru import logger
class ChaXun():
def __init__(self):
self.db = pymysql.connect(host="localhost", user="root", password="root", db="spiders16")
self.cursor = self.db.cursor()
self.index_url = 'https://qq.ip138.com/train/'
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
}
self.ip_url = 'https://dps.kdlapi.com/api/getdps/?secret_id=ouy6zuogcnb6t516855l&signature=ousxp9hrflvxj7vc19kjrxkrmw&num=1&pt=1&format=text&sep=1'
# 用户名密码认证(私密代理/独享代理)
self.username = "d2952905742"
self.password = "cixv0obv"
self.ip_queue = Queue()
self.province_url_queue = Queue()
self.detail_url_queue = Queue()
self.data_queue = Queue()
def create_table(self):
# 使用预处理语句创建表
sql = '''
CREATE TABLE IF NOT EXISTS huoche(
id int primary key auto_increment not null,
train_number VARCHAR(255) NOT NULL,
Train_type VARCHAR(255) NOT NULL,
starting_station VARCHAR(255) NOT NULL,
way_station VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL
)
'''
try:
self.cursor.execute(sql)
print("CREATE TABLE SUCCESS.")
except Exception as ex:
print(f"CREATE TABLE FAILED,CASE:{ex}")
def get_ip(self):
"""
获取ip数据,将ip数据放进队列
:return: ip地址
"""
while True:
if self.ip_queue.empty():
response = requests.get(self.ip_url)
# print(response.text)
self.ip_queue.put(response.text)
else:
continue
# 最大重试3次,3次全部报错,才会报错
@retry(stop_max_attempt_number=3)
def get_data(self, url):
ip = self.ip_queue.get()
proxies = {
"http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {'user': self.username, 'pwd': self.password, 'proxy': ip},
"https": "https://%(user)s:%(pwd)s@%(proxy)s/" % {'user': self.username, 'pwd': self.password, 'proxy': ip}
}
print(ip)
self.headers['User-Agent'] = get()
response = requests.get(url=url, headers=self.headers, timeout=2, proxies=proxies, verify=False)
# response = requests.get(url=url, headers=self.headers, timeout=2, verify=False)
response.encoding = 'utf-8'
if response.status_code == 200:
self.ip_queue.put(ip)
else:
raise ('状态码错误')
return response
def get_province_url(self):
response = self.get_data(self.index_url)
html = etree.HTML(response.text)
dd_list = html.xpath('//div[@class="list"]/dl/dd')
for dd in dd_list[:2]:
href = 'https://qq.ip138.com' + dd.xpath('./a/@href')[0]
self.province_url_queue.put(href)
def detail_url_get(self):
while True:
province_url = self.province_url_queue.get()
try:
response = self.get_data(province_url)
except Exception as e:
logger.error('异常省份网址:' + province_url)
continue
html = etree.HTML(response.text)
li_list = html.xpath('//div[@class="module mod-list"]/div[@class="bd"]/div/div[2]/ul/li')
for li in li_list:
item = {}
item['to_city'] = li.xpath('./a/text()')[0]
item['to_city_href'] = 'https://qq.ip138.com' + li.xpath('./a/@href')[0]
self.detail_url_queue.put(item)
self.province_url_queue.task_done()
def paras_data(self):
while True:
detail_url = self.detail_url_queue.get()
try:
response = self.get_data(detail_url['to_city_href'])
except Exception as e:
logger.error('异常市区网址:' + detail_url['to_city_href'])
continue
html = etree.HTML(response.text)
tr_list = html.xpath('//div[@class="table-inner"]/table/tbody/tr')
for tr in tr_list:
train_number = tr.xpath('./td[1]/a/b/text()')[0]
Train_type = tr.xpath('./td[2]/text()')[0]
starting_station = tr.xpath('./td[3]/a/text()')[0]
way_station = tr.xpath('./td[5]/a/text()')[0]
destination = tr.xpath('./td[8]/a/text()')[0]
print(train_number, Train_type, starting_station, way_station, destination)
self.data_queue.put((train_number, Train_type, starting_station, way_station, destination))
self.detail_url_queue.task_done()
def save_data(self):
while True:
self.data_list = []
for i in range(30):
data = self.data_queue.get()
self.data_list.append((0,) + data)
self.data_queue.task_done()
# SQL 插入语句
sql = 'INSERT INTO huoche(id, train_number, Train_type, starting_station, way_station, destination) values(%s, %s, %s, %s, %s, %s)'
# 执行 SQL 语句
try:
self.cursor.executemany(sql, self.data_list)
# 提交到数据库执行
self.db.commit()
print('数据插入成功...')
except Exception as e:
print(f'数据插入失败: {e}')
# 如果发生错误就回滚
self.db.rollback()
def main(self):
self.create_table()
t_list = []
# 获取ip线程
t_ip = threading.Thread(target=self.get_ip)
t_list.append(t_ip)
# 获取分类线程
t_info = threading.Thread(target=self.get_province_url)
t_list.append(t_info)
# 获取详细商品线程
for i in range(1):
t_detail_url = threading.Thread(target=self.detail_url_get)
t_list.append(t_detail_url)
# 解析数据线程
for i in range(1):
t_paras = threading.Thread(target=self.paras_data)
t_list.append(t_paras)
# 保存数据线程
t_save = threading.Thread(target=self.save_data)
t_list.append(t_save)
for t in t_list:
t.setDaemon(True)
t.start()
time.sleep(4)
for q in [self.province_url_queue, self.detail_url_queue, self.data_queue]:
q.join()
print('子线程结束!!!')
if self.data_list:
# SQL 插入语句
sql = 'INSERT INTO huoche(id, train_number, Train_type, starting_station, way_station, destination) values(%s, %s, %s, %s, %s, %s)'
# 执行 SQL 语句
try:
# print(sql, (0, data[0], data[1], data[2], data[3]))
self.cursor.executemany(sql, self.data_list)
# 提交到数据库执行
self.db.commit()
print('数据插入成功...')
except Exception as e:
print(f'数据插入失败: {e}')
# 如果发生错误就回滚
self.db.rollback()
if __name__ == '__main__':
# 文件过大于500M就会重新生成一个文件
logger.add("runtime_{time}.log", rotation="500 MB")
hc = ChaXun()
hc.main()
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果