OOLY BLOG

柳絮纷飞
hey!

Python队列Queue数据存取以及多线程Thread爬虫 2.进阶

系统说明:

学习,最重要的是融会贯通!
花了一晚时间写了一段关键词爬取磁力以及自动入SQLite存库,简单使用了队列以及多线程技术,优化后版本在此分享记录
共使用7个线程,1个主线程6个子线程爬取数据

本地环境:

  • Oracle 11g 64 位
  • python 3.7.9
  • windows 10
  • pycharm

完整代码:

#-*- codeing = utf-8 -*-
#@Time :2020/12/19 15:26
#@Author : zh
#@File : cilipa_spider.py
#@Software: PyCharm
#ps:输入关键词爬取磁力链接--多线程

import sqlite3
import time
import threading
from queue import Queue
from bs4 import BeautifulSoup
import re
import requests

CRAWL_EXIT = False
CRAWL_NULL = False
# 正则-   去除文字行元素标签
reLabel = re.compile(r'<[^>]+>', re.S)
dataList = []
count = 0
pages_number = 0
headers={
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36"
}
# 代理设置
proxies = {
    # 'https': 'https://155.94.xxx.xx:7777',  # 查找到你的代理在本机使用的https代理端口
    'http': 'http://155.94.xxx.xx:7777',  # 查找到代理在本机使用的http代理端口
}
class ThreadCrawl(threading.Thread):
    # 初始化
    def __init__(self, thread_name, page_queue,data_queue,params):
        # threading.Thread.__init__(self)
        # 调用父类初始化方法
        super(ThreadCrawl, self).__init__()
        self.threadName = thread_name
        self.page_queue = page_queue
        self.data_queue = data_queue
        self.params = params
        self.result = []

    # 开始爬取
    def run(self):
        while not CRAWL_EXIT:
            try:
                # 队列为空 产生异常 爬完
                page = self.page_queue.get(block=False)  # 从初始化类里面获取参数值
                # 目标网址
                url = "http://clb.biz/s/%s_rel_%d.html" % (self.params, page)
            except:
                break

            global pages_number
            pages_number = page

            timeout = 4  # 合格地方是尝试获取3次,3次都失败,就跳出
            while timeout > 0:
                timeout -= 1
                try:
                    # 爬取逻辑开始
                    rep = requests.get(url, headers=headers, timeout=5)
                    if rep.status_code == 200:
                        html = rep.content.decode('utf-8')
                        soup = BeautifulSoup(html, 'html.parser')
                        # 判断是否爬取完毕
                        if len(soup.find_all('div', class_="search-item")) == 0:
                            # print("已到最后页无数据,结束程序!")
                            # 如果page_queue为空,采集线程退出循环
                            # global CRAWL_NULL
                            # CRAWL_NULL = True
                            break
                        for file in soup.find_all('div', class_="search-item"):
                            file = str(file)
                            datas = []
                            url_link = re.findall(re.compile(r'<a href="(.*?)"'), file)[0]
                            name = re.findall(re.compile('<a .* target="_blank">(.*?)</a>'), file)[0]
                            name = reLabel.sub('', name)
                            type = re.findall(re.compile(r'<span class="cpill .*">(.*?)</span>'), file)[0]
                            creat_time = re.findall(re.compile(r'<span>创建时间: <b>(.*?)</b></span>'), file)[0]
                            file_size = re.findall(re.compile(r'<b class="cpill yellow-pill">(.*?)</b>', re.S), file)
                            if len(file_size) == 0:
                                break
                            down_hot = re.findall(re.compile(r'<span>下载热度:<b>(.*?)</b></span>'), file)[0]
                            url_link = url[:15] + url_link

                            # 开始获取详情页数据
                            detail = requests.get(url_link, headers=headers, timeout=5)
                            detail_html = detail.content.decode("utf-8")
                            detail_soup = BeautifulSoup(detail_html, "html.parser")
                            cili_url = ''
                            for data in detail_soup.find_all('div', class_="fileDetail"):
                                cili_url = re.findall(re.compile(r'<a class="download" href="(.*?)"', re.S), str(data))[0]
                                # print(f"第{page}页",name, url_link, type, creat_time, file_size[0], down_hot, cili_url,'\n')
                                global count
                                count += 1
                            datas.append(name)
                            datas.append(type)
                            datas.append(file_size[0])
                            datas.append(down_hot)
                            datas.append(str(creat_time).replace("-", ""))
                            datas.append(cili_url)
                            datas.append(url_link)
                            # url_link_id = str(url_link_id).replace("/","")
                            # url_link_id = str(url_link_id).replace(".html", "")
                            datas.append(cili_url[20:])
                            dataList.append(datas)
                            # 成功获取,终止超时重试
                            timeout = 0
                            print(datas,'\n')
                        # break
                    else:
                        print("未请求到网站内容,请检查url")
                except Exception as e:
                    break
        # 内容加入全局结果集
        self.result.append(dataList)


    # 返回Class全局变量的爬取全部结果集
    def get_result(self):
        try:
            return self.result  # 如果子线程不使用join方法,此处可能会报没有self.result的错误
        except Exception as e:
            print("异常",e)
            return None


#保存数据库
def saveSqlite(dataList):
    try:
        conn = sqlite3.connect("My_Data.db")
        cur = conn.cursor()
        # name, type, creat_time, file_size[0], down_hot, cili_url,url_link
        sql = '''
            create table cili_data(
                id varchar(255) not null primary key ,
                name text,
                type varchar(50),
                file_size varchar(50),
                down_hot varchar(50),
                creat_time varchar(25),
                cili_url text,
                url_link text
            );
        '''
        try:
            #创建表
            cur.execute(sql)
            conn.commit()
        except Exception as e:
            print("异常:",e)
            #如已存在便跳出异常
            pass
        #开始拼接 插入sql
        for data in dataList:
            for i in range(len(data)):
                data[i] = '"'+data[i] + '"'
            sql_save = '''insert into cili_data(name,type,file_size,down_hot,creat_time,cili_url,url_link,id)  values(%s)''' % ",".join(data)
            # print(sql_save)
            try:
                cur.execute(sql_save)
                conn.commit()
            except Exception as e:
                print("已存在数据跳过,异常:",e)
                pass
        print("此批已存完毕")
        cur.close()
        conn.close()
    except Exception as e:
        print("异常:",e)


def main(page_num,params):
    # 声明一个队列,使用循环在里面存入100个页码
    page_queue  = Queue(page_num)
    for i in range(1,page_num):
        page_queue.put(i)
    # 采集结果(等待下载的图片地址)
    data_queue = Queue()
    # 记录线程的列表
    thread_crawl = []
    # 每次开启4个线程
    craw_list = ['Thread01','Thread02','Thread03','Thread04','Thread05','Thread06']

    # if not os.path.exists('download'):
    #     os.mkdir('download')

    for thread_name in craw_list:
        c_thread = ThreadCrawl(thread_name, page_queue,data_queue,params)
        c_thread.start()
        thread_crawl.append(c_thread)
    print('启动:'+str(thread_crawl))
    # 等待page_queue队列为空,也就是等待之前的操作执行完毕
    while not page_queue.empty():
        pass
    # 如果page_queue为空,采集线程退出循环
    global CRAWL_EXIT
    CRAWL_EXIT = True

    for t in thread_crawl:
        t.join() # 一定要join,不然主线程比子线程跑的快,会拿不到结果
        # print(t.get_result())


if __name__ == '__main__':
    params = input("请输入要搜索磁力的关键词:")
    page_num = input("请输入爬取页数:")
    start_time = time.perf_counter()
    main(int(page_num),params)
    end_time = time.perf_counter()
    endTime = (end_time - start_time).__str__()
    qr = input("爬取完毕,确认是否存入库(y/n):")
    if qr=='y':
        saveSqlite(dataList)
        end2_time = time.perf_counter()
        endTime2 = (end2_time - end_time).__str__()
        print(f"爬取耗时:{endTime[:5]}ms, 总共{pages_number}页,总计{count}条数据")
        print(f"保存数据入库完毕! 耗时:{endTime2[:5]}ms")
    else:
        print(f"爬取完毕! 耗时:{endTime[:5]}ms, 总共{pages_number}页,总计{count}条数据")

结果展示:

转载文章请注明出处 | 当前页面:OOLY BLOG » Python队列Queue数据存取以及多线程Thread爬虫 2.进阶

评论

captcha
请输入验证码