DEV Community

drake
drake

Posted on

百度指数爬虫

获取各个省份,各个关键词的数据,整合成一张csv表


"""
百度指数爬虫 2025年3月
"""
import json
from traceback import format_exc
import time

import requests
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import random
from requests.exceptions import RequestException

# 从浏览器中复制
cookies_dict = {
    "BAIDUID_BFESS": ":FG=1",
    "Hm_lvt_d101ea4d2a5c67dab98251f0b5de24dc": "1740900295",
    "HMACCOUNT": "70309D756AB7564A",
    "ppfuid": "/69CStRUAcn/QmhIlFDxPrAc/s5tJmCocrihdwitHd04Lvs3Nfz26Zt2holplnIKVacidp8Sue4dMTyfg65BJnOFhn1HthtSiwtygiD7p=",
    "BDUSS": "",
    "SIGNIN_UC": "70a2711cf1d3d9b1a82d2f87d633bd8a04909129477oUCDVgDuFIWQq0I5Qh%%%2FMKxlLMEZywpREcfeqkBuNDImGT1swOphjUr0m7yoFRuoRONhZO0DhIUp8qMp%2BI%2BGZ9URB2%2FDv3g%2FwZ0nXnjrScjtkdIga7hBOF4Os4RsqXflHU7INYd10uoQ2Ecn99qPwcD5%2BuKJ7%2BtRR94%3D59476045678983651647832308115528",
    "__cas__rn__": "490912947",
    "__cas__st__212": "",
    "__cas__id__212": "40927145",
    "CPTK_212": "1776632285",
    "CPID_212": "40927145",
    "Hm_lpvt_d101ea4d2a5c67dab98251f0b5de24dc": "1740900504",
    "BDUSS_BFESS": "",
    "bdindexid": "jcohnli710phtu4po08tnl0o33",
    "ab_sr": "==",
    "RT": "zzl"
}

credential = {
    "cipherText": "1740888411355_1740901282164_aLrE9Za0dpKtlO3CQw1IR/Yz3hP8cGXzHI/2BnYqUk5XRMPS4pr5kfk3slC7+G60AS9KjhhlCPNuQnqbFhpZS9Z7MUVTxUeQ8XlgGhrmV+FapK3+nQuTdrm1pz8Jy5qhWO0pOhQyUqv/AR5RFI0hKsasKjMYDQfng+XPMhygTo0rCb3PLrFDflBQ1riNlJ7Bg8s6TfsE3OMaJPAQsjhaZlZO1bXUAhFIY0EMqIxq2DAkMVEatrHKmDbkb0f2NJw988jZkhDEZTAJ06iAXqSLbKnbF0bPCUIqaT/a5yeqr2KtCwbJYH4flHQSoThN40a6t/XiyTqUc1Mdds6w27Q/qOyR+nPe8978fEsEB3UssJ9LPc62xsjzLmY1x5qH7eA/j7eJAgbbWVvYW8H/4N3iaauKg0D1F8NqUHMGoGVpAQSj0/HLx5pUebCoFBVBnbA2kMYD8kvavD1WzPEMte2sp2uhlSGB4IIDMkqz13eaIsc=",
    "cookie_BDUSS": cookies_dict['BDUSS']
}

def generate_http_headers(credential):
    http_headers = {
        'Cookie': 'BDUSS=' + credential["cookie_BDUSS"],
        'Cipher-Text': credential["cipherText"],
        'Accept': 'application/json, text/plain, */*',
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Connection': 'keep-alive',
        'Referer': 'https://index.baidu.com/v2/main/index.html',
        'Host': 'index.baidu.com',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }
    return http_headers


def calculate_yearly_averages(start_date, end_date, data_series):
    # Convert the start and end dates to datetime objects
    start = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')
    days_span = (end - start).days + 1

    # Split the data series into a list and replace empty strings with '0'
    data_points = data_series.split(',')
    data_points = ['0' if point == '' else point for point in data_points]
    data_points = np.array(data_points, dtype=float)

    if days_span <= 366:
        dates = pd.date_range(start, periods=len(data_points))
    else:
        weeks_span = len(data_points)
        dates = pd.date_range(start, periods=weeks_span, freq='W')

    # Create a DataFrame with the dates and data points
    df = pd.DataFrame({'Date': dates, 'Data': data_points})
    df.set_index('Date', inplace=True)

    # Calculate the yearly average
    yearly_averages = df.resample('YE').mean().reset_index()
    yearly_averages['Year'] = yearly_averages['Date'].dt.year
    yearly_averages.drop('Date', axis=1, inplace=True)
    yearly_averages.rename(columns={'Data': 'Average'}, inplace=True)
    # Convert DataFrame to list of tuples (year, average)
    yearly_averages_list = list(yearly_averages.itertuples(index=False, name=None))
    print(yearly_averages_list)

    return yearly_averages_list


# 解密
def decrypt(ptbk, index_data):
    n = len(ptbk) // 2
    a = dict(zip(ptbk[:n], ptbk[n:]))
    return "".join([a[s] for s in index_data])


def keywords2json(keyword):
    import json
    converted_keywords = [[{"name": keyword, "wordType": 1}]]
    # Convert the list of lists of dictionaries into a JSON string
    json_string = json.dumps(converted_keywords, ensure_ascii=False)
    print(json_string)
    return json_string

#
# def namely(keywords):
#     return '+'.join(keywords)


def crawl_request(keyword, startDate, endDate, regionCode, credential, expectedInterval, autoSave, regionName, data_combine):
    print('正在查询:', keyword, startDate, endDate, regionCode)
    words = keywords2json(keyword)

    # 第一级以逗号分隔,第二级以加号分隔
    testwordset = keyword
    max_retries = 3  # 最大重试次数
    retries = 0  # 当前重试次数

    while retries < max_retries:
        try:
            url = f'https://index.baidu.com/api/AddWordApi/checkWordsExists?word={testwordset}'
            rsp = requests.get(url, headers=generate_http_headers(credential), timeout=10).json()
            # 若data的result不为空,则说明关键词不存在,报错并退出
            if rsp['data']['result']:
                print(f'{testwordset}关键词不存在或组合里有不存在的关键词,请检查')
                return -1

            url = f'http://index.baidu.com/api/SearchApi/index?area=0&word={words}&area={regionCode}&startDate={startDate}&endDate={endDate}'
            rsp = requests.get(url, headers=generate_http_headers(credential), timeout=10).json()

            # 获取解密秘钥
            data = rsp['data']['userIndexes']
            uniqid = rsp['data']['uniqid']
            url = f'https://index.baidu.com/Interface/ptbk?uniqid={uniqid}'
            ptbk = requests.get(url, headers=generate_http_headers(credential), timeout=10).json()['data']

            # 数据解密
            res = [0 for _ in range(len(data))]
            for i in range(len(data)):
                index_data = decrypt(ptbk, data[i]['all']['data'])
                yearly_averages = calculate_yearly_averages(startDate, endDate, index_data)
                for tuple_item in yearly_averages:
                    index_d = round(tuple_item[0],2)
                    year = tuple_item[1]
                    if year > 2022:
                        continue
                    if year in data_combine:
                        data_combine[year].append(index_d)
                    else:
                        data_combine[year] = [year, regionName, index_d]
            return res
        except Exception as e:
            print(f'请求失败,错误信息:{e}')
            retries += 1
            print(f'重试第{retries}次...')
            time.sleep(random.randint(1, 3))  # 在重试前等待一段时间
    if retries == max_retries:
        print(f'请求失败次数过多,已达到最大重试次数{max_retries},跳过当前连接')
        return -1


# regions = {}
provinces = {
            901: "山东",
            902: "贵州",
            903: "江西",
            904: "重庆",
            905: "内蒙古",
            906: "湖北",
            907: "辽宁",
            908: "湖南",
            909: "福建",
            910: "上海",
            911: "北京",
            912: "广西",
            913: "广东",
            914: "四川",
            915: "云南",
            916: "江苏",
            917: "浙江",
            918: "青海",
            919: "宁夏",
            920: "河北",
            921: "黑龙江",
            922: "吉林",
            923: "天津",
            924: "陕西",
            925: "甘肃",
            926: "新疆",
            927: "河南",
            # 928: "安徽",
            929: "山西",
            930: "海南",
            931: "台湾",
            # 932: "西藏",
            933: "香港",
            934: "澳门"
        }

regions = provinces

def crawl(regionCode, credential, expectedInterval, autoSave, regionName, data_combine):
    # 获取11年到22年的数据
    startDate = '2011-01-01'
    endDate = '2022-12-31'
    # 清洗关键词
    keywords = ['第三方支付', '在线支付', '移动支付', '网贷', '互联网理财', '互联网保险', '在线理财', '电子银行', '网银', '大数据', '云计算', '人工智能', '区块链', '生物识别']

    # res = {regionCode: []}
    for keyword in keywords:
        if regionCode != '999':
            try:
                crawl_request(keyword, startDate, endDate, regionCode, credential, expectedInterval, autoSave, regionName, data_combine)
            except:
                print(format_exc())
            # res[regionCode].extend(t)
            # 每次查询后休息一到五秒,实际上在账号很多的情况下,这个时间可以缩短
            time.sleep(expectedInterval / 1000 + random.randint(1, 3) / 2)
if __name__ == '__main__':
    import csv
    # # 清洗关键词
    # titles = ['年份', '区域', '第三方支付', '在线支付', '移动支付', '网贷', '互联网理财', '互联网保险', '在线理财', '电子银行', '网银',
    #             '大数据', '云计算', '人工智能',
    #             '区块链', '生物识别']
    # with open('combine.csv', 'a', encoding='utf-8-sig', newline='') as csvfile:
    #     writer = csv.writer(csvfile)
    #     writer.writerow(titles)

    for regionCode in regions:
        # regionCode = 928
        # regionName = '安徽'
        regionName = regions[regionCode]
        data_combine = {}


        crawl(regionCode, credential, 10, True, regionName, data_combine)
        data_list = []
        for i in data_combine:
                data_list.append(data_combine[i])
        with open('combine.csv', 'a', encoding='utf-8-sig', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerows(data_list)

Enter fullscreen mode Exit fullscreen mode

Top comments (0)