XPath

import requests
from lxml import etree


def send_request():
    url = 'https://www.shuqi.com/'
    headers = {'user-agent':
                   'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'}
    return requests.get(url, headers)


def extract_data(resp):
    e = etree.HTML(resp.text)
    img = e.xpath('//div[@class="common-slide"]/a/img/@src')
    name = e.xpath('//div[@class="book-info-container"]/a/text()')
    author = e.xpath('//div[@class="book-info-container"]/div[@class="author-name"]/text()')
    status = e.xpath('//div[@class="book-info-container"]/div[@class="book-tag"]/text()')
    data_lst = []
    field = ['name', 'author', 'status', 'img']
    for book in zip(name, author, status, img):
        data = {}
        for index in range(len(book)):
            data[field[index]] = book[index].strip()
        data_lst.append(data)
    return data_lst


if __name__ == '__main__':
    print(extract_data(send_request()))

BeautifulSoup

中文文档

case1

from bs4 import BeautifulSoup

html = '''
    <html>
        <head></head>
        <body>
            <h1 class='hello'>hello world <!--HELLO WORLD--></h1>
            <div id='d1' class='c1'>
                <span>123</span>
                <span>456</span>
                <span>789</span>
            </div>
            <div id='d2' class='c2'>
                <h1>
                    <span>1232</span>
                    <span>4562</span>
                    <span>7892</span>
                </h1>
            </div>
        </body>
    </html>
'''
# 创建对象
bs = BeautifulSoup(html, 'lxml')
# id为d2标签>子标签h1>子标签span元素值
for t in bs.select('#d2>h1>span'):
    print(t.text)
# 查找id为d2的div标签
print(bs.find('div', id='d2'))
# 获取h1标签属性class的值
print(bs.h1.attrs['class'])
# 获取标签内值
print(bs.h1.text)
print(bs.h1.string)

case2

import dataclasses

import requests
from bs4 import BeautifulSoup

url = 'https://www.biqu68.com/book/33787/'
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'}


# <video class="VideoTag_video__LcmT4" playsinline="" loop="" src="https://videos.pexels.com/video-files/4019911/4019911-sd_360_640_24fps.mp4" preload="none"><track kind="metadata" label="cuepoints" data-removeondestroy=""></video>
@dataclasses.dataclass
class Book():
    author: str
    category: str
    status: str
    last_updatetime: str
    word_count: str


html = requests.get(url, headers)
html.encoding = 'utf-8'

soup = BeautifulSoup(html.text.replace('&nbsp;', ''), 'html.parser')

book_infos = soup.find('div', {'class': 'fix'}).find_all('p')
val_lst = [p.text.strip().split(':')[1] for p in book_infos if p.text.find('动作') == -1]
b = Book(val_lst[0], val_lst[1], val_lst[2], val_lst[3], val_lst[4])
print(b)

Json

from dataclasses import dataclass, asdict
import json

@dataclass
class User():
    name:str
    age:int
    sex:int

if __name__ == '__main__':
    u = User(name='zs', age=12, sex=1)
    # u_str = json.dumps(asdict(u), ensure_ascii=False)
    # json.dump(asdict(u), open('user.txt', 'w'), ensure_ascii=False)
    res = json.load(open('user.txt', 'r'))
    print(res)

CSV

# encoding='utf-8':设置文件内容编码
# newline='':写入避免双倍行距
def write_data():
    with open('stu.csv', 'a+', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Jhone', 23, '0001', '男'])  # 单行写入
        writer.writerows([{'Jhone', 23, '0001', '男'}, {'Henri', 10, '0002', '女'}])  # 多行写入

def read_data():
    with open('stu.csv', 'r', newline='', encoding='utf-8') as file:
        reader = csv.reader(file)
        for row in reader:
            print(row)

Excel

wb = openpyxl.Workbook()
sheet = wb.active
cell = sheet['A1']
cell.value = '中国美丽'
# 写入一行数据
data = ['姓名', '年龄', '成绩']
sheet.append(data)
# 写入多行数据
data = [['张三', '26', '36'], ['Jone', '35', '52'], ['Henr', '66', '10']]
for row in data:
    sheet.append(row)
wb.save('test.xlsx')

wb = openpyxl.load_workbook('test.xlsx')
sheet = wb.active
# cell = sheet['A1']
# value = cell.value
columns = sheet['A']
for col in columns:
    print(col.value)

columns = sheet[1]
for row in columns:
    print(row.value)

columns = sheet['B:C']

pymongo

import pymongo

# 获取连接
client = pymongo.MongoClient('localhost', 27017)
# 指定数据库
db = client.school
# 获取集合
coll = client.school.student
# 查询
res = coll.find()
for s in res:
    print(s)

Mysql

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker, backref

db_url = 'mysql+pymysql://root:root@127.0.0.1:3306/py?charset=utf8mb4'
engine = create_engine(db_url)

Base = declarative_base()
session = sessionmaker(bind=engine)()


class Person(Base):
    __tablename__ = 't_person'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50))

    def __str__(self):
        return '[Person[id={}, name={}]]'.format(self.id, self.name)


class Card(Base):
    __tablename__ = 't_card'
    id = Column(Integer, primary_key=True, autoincrement=True)
    card_number = Column(String(50))
    p_id = Column(Integer, ForeignKey('t_person.id'))
    person = relationship('Person', backref=backref('id_card', uselist=False))

    def __str__(self):
        return 'Card[id = {}, card_number = {}, p_id = {}]'.format(self.id, self.card_number, self.p_id)


def create_table():
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    session.commit()


def add_person():
    p1 = Person(name='zs')
    p2 = Person(name='ls')
    p3 = Person(name='ww')
    session.add_all([p1, p2, p3])
    session.commit()


def add_card():
    c1 = Card(card_number='1001')
    c2 = Card(card_number='1002')
    c3 = Card(card_number='1003')
    session.add_all([c1, c2, c3])
    session.commit()


def edit_card():
    card = session.query(Card).all()
    for i in range(len(card)):
        card[i].p_id = str(i + 1)
        print(card[i])
    session.commit()


def query_person():
    p_list = session.query(Person).all()
    for p in p_list:
        print(p)


if __name__ == '__main__':
    # create_table()
    # add_person()
    # add_card()
    # edit_card()
    card = session.query(Card).filter(Card.id == '1').first()
    print(card)
    print(card.person.name)

多线程

1. 锁

threading.Lock()

import threading
import random
import time
# 创建锁
lock = threading.Lock()
g_money=0
class Producer(threading.Thread):
    def run(self):
        global g_money
        for _ in range(10):
            lock.acquire()
            money = random.randint(1, 10)
            g_money += money
            print(threading.current_thread().name, '挣了{}钱,当前余额为{}钱'.format(money, g_money))
            time.sleep(1)
            lock.release()
class Consumer(threading.Thread):

    def run(self):
        global g_money
        for _ in range(10):
            lock.acquire()
            money = random.randint(1, 10)
            if money <= g_money:
                g_money -= money
                print(threading.current_thread().name, '消费了{}钱,当前余额为{}钱'.format(money, g_money))
            else:
                print(threading.current_thread().name, '准备消费{}钱,余额不足!当前余额为{}钱'.format(money, g_money))
            time.sleep(1)
            lock.release()

def start():
    for i in range(5):
        Producer(name='生产者{}'.format(i)).start()
        Consumer(name='消费者{}'.format(i)).start()
if __name__ == '__main__':
    start()

threading.Condition()

import threading
import random
import time

# 创建锁
lock = threading.Condition()
g_money = 0
g_time = 0

class Producer(threading.Thread):
    def run(self):
        global g_money
        global g_time
        for _ in range(10):
            lock.acquire()
            money = random.randint(1, 10)
            g_money += money
            g_time += 1
            print(threading.current_thread().name, '挣了{}钱,当前余额为{}钱'.format(money, g_money))
            # 生产者每次结束后在release前通知消费者
            lock.notify_all()
            lock.release()

class Consumer(threading.Thread):

    def run(self):
        global g_money
        global g_time
        for _ in range(10):
            lock.acquire()
            money = random.randint(1, 10)
            while money > g_money:
                if g_time >= 50:
                    print('所有生产流程结束,time:{}'.format(g_time))
                    lock.release()
                    return
                print(threading.current_thread().name,
                      '-----------------准备消费{}钱,余额不足!当前余额为{}钱--------------'.format(money, g_money))
                lock.wait()
            g_money -= money
            print(threading.current_thread().name, '消费了{}钱,当前余额为{}钱'.format(money, g_money))
            lock.release()

def start():
    for i in range(5):
        Producer(name='生产者{}'.format(i)).start()
    for i in range(5):
        Consumer(name='消费者{}'.format(i)).start()

if __name__ == '__main__':
    start()

queue

import threading
import queue, random
import time

def add_val(q):
    while True:
        num = random.randint(1, 10)
        q.put(num)
        print(threading.current_thread().name, '存值:{}'.format(num))
        time.sleep(1)

def get_val(q):
    while True:
        res = q.get()
        print(threading.current_thread().name, '取值:{}'.format(res))
        time.sleep(1)

def start():
    q = queue.Queue(10)
    add_thread = threading.Thread(target=add_val, args=(q,))
    get_thread = threading.Thread(target=get_val, args=(q,))

    add_thread.start()
    get_thread.start()

if __name__ == '__main__':
    start()

demo1

import json
import os.path
from urllib import parse, request
import requests

# 获取数据
def get_json_obj():
    url = 'https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page=0&iOrder=0&iSortNumClose=1&jsoncallback=&iAMSActivityId=51991&_everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&_=1755336231180'
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
        'referer': 'https://pvp.qq.com/'
    }
    response = requests.get(url, headers)
    return json.loads(response.text)


# 提取数据
def extract_data(json_obj):
    res_data = {}
    for data in json_obj['List']:
        name = parse.unquote(data['sProdName'])
        img_url_lst = []
        for i in range(1, 8):
            img_url_lst.append(parse.unquote(data['sProdImgNo_' + str(i)]).replace('/200', '/0'))
        res_data[name] = img_url_lst
    return res_data


def save_jpg(d):
    for key in d:
        dirpath = os.path.join('E:\Python\爬虫\img', key.strip())
        if not os.path.exists(dirpath):
            os.mkdir(dirpath)
        for index, image_url in enumerate(d[key]):
            request.urlretrieve(image_url, os.path.join(dirpath, '{}.jpg').format(index + 1))
            print('{}下载完毕'.format(d[key][index]))

if __name__ == '__main__':
    json_obj = get_json_obj()
    res = extract_data(json_obj)
    save_jpg(res)

pyquery

from pyquery import PyQuery as pq

doc = pq(url="http://www.baidu.com", encoding='utf-8')
# print([p for p in dir(doc('a')) if not p.startswith('_') and not p.startswith('__')])

# 查找所有a标签,并获取其属性href
# for i in doc('a').items():
#     print(i.attr('href'))

# for i in doc('a').items():
#     if i.attr('href').startswith('http'):
#         print(i.attr('href'))

# 从文件中读取数据
# doc = pq(filename='demo.html')
# print(doc('title').text())

html = '''
    <html>
        <head>
            <title>PyQuery demo</title>
        </head>
        <body>
            <h1>PyQuery demo</h1>
            <div id="main">
                <a href="http://www.mashbing.com">马士兵教育</a>
                <h1>hello</h1>
            </div>
            <h2>Python学习</h2>
        </body>
    </html>
    '''
doc = pq(html)
print(doc('#main'))
print(doc('#main').parent())
print(doc('#main').children())
print(doc('#main').siblings())
print(doc('a').attr('href'))
print(doc('#main').html())
print(doc('#main').text())