Python-reptile
XPath
import requests
from lxml import etree
def send_request():
url = 'https://www.shuqi.com/'
headers = {'user-agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'}
return requests.get(url, headers)
def extract_data(resp):
e = etree.HTML(resp.text)
img = e.xpath('//div[@class="common-slide"]/a/img/@src')
name = e.xpath('//div[@class="book-info-container"]/a/text()')
author = e.xpath('//div[@class="book-info-container"]/div[@class="author-name"]/text()')
status = e.xpath('//div[@class="book-info-container"]/div[@class="book-tag"]/text()')
data_lst = []
field = ['name', 'author', 'status', 'img']
for book in zip(name, author, status, img):
data = {}
for index in range(len(book)):
data[field[index]] = book[index].strip()
data_lst.append(data)
return data_lst
if __name__ == '__main__':
print(extract_data(send_request()))
BeautifulSoup
中文文档
case1
from bs4 import BeautifulSoup
html = '''
<html>
<head></head>
<body>
<h1 class='hello'>hello world <!--HELLO WORLD--></h1>
<div id='d1' class='c1'>
<span>123</span>
<span>456</span>
<span>789</span>
</div>
<div id='d2' class='c2'>
<h1>
<span>1232</span>
<span>4562</span>
<span>7892</span>
</h1>
</div>
</body>
</html>
'''
# 创建对象
bs = BeautifulSoup(html, 'lxml')
# id为d2标签>子标签h1>子标签span元素值
for t in bs.select('#d2>h1>span'):
print(t.text)
# 查找id为d2的div标签
print(bs.find('div', id='d2'))
# 获取h1标签属性class的值
print(bs.h1.attrs['class'])
# 获取标签内值
print(bs.h1.text)
print(bs.h1.string)
case2
import dataclasses
import requests
from bs4 import BeautifulSoup
url = 'https://www.biqu68.com/book/33787/'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'}
# <video class="VideoTag_video__LcmT4" playsinline="" loop="" src="https://videos.pexels.com/video-files/4019911/4019911-sd_360_640_24fps.mp4" preload="none"><track kind="metadata" label="cuepoints" data-removeondestroy=""></video>
@dataclasses.dataclass
class Book():
author: str
category: str
status: str
last_updatetime: str
word_count: str
html = requests.get(url, headers)
html.encoding = 'utf-8'
soup = BeautifulSoup(html.text.replace(' ', ''), 'html.parser')
book_infos = soup.find('div', {'class': 'fix'}).find_all('p')
val_lst = [p.text.strip().split(':')[1] for p in book_infos if p.text.find('动作') == -1]
b = Book(val_lst[0], val_lst[1], val_lst[2], val_lst[3], val_lst[4])
print(b)
Json
from dataclasses import dataclass, asdict
import json
@dataclass
class User():
name:str
age:int
sex:int
if __name__ == '__main__':
u = User(name='zs', age=12, sex=1)
# u_str = json.dumps(asdict(u), ensure_ascii=False)
# json.dump(asdict(u), open('user.txt', 'w'), ensure_ascii=False)
res = json.load(open('user.txt', 'r'))
print(res)
CSV
# encoding='utf-8':设置文件内容编码
# newline='':写入避免双倍行距
def write_data():
with open('stu.csv', 'a+', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerow(['Jhone', 23, '0001', '男']) # 单行写入
writer.writerows([{'Jhone', 23, '0001', '男'}, {'Henri', 10, '0002', '女'}]) # 多行写入
def read_data():
with open('stu.csv', 'r', newline='', encoding='utf-8') as file:
reader = csv.reader(file)
for row in reader:
print(row)
Excel
wb = openpyxl.Workbook()
sheet = wb.active
cell = sheet['A1']
cell.value = '中国美丽'
# 写入一行数据
data = ['姓名', '年龄', '成绩']
sheet.append(data)
# 写入多行数据
data = [['张三', '26', '36'], ['Jone', '35', '52'], ['Henr', '66', '10']]
for row in data:
sheet.append(row)
wb.save('test.xlsx')
wb = openpyxl.load_workbook('test.xlsx')
sheet = wb.active
# cell = sheet['A1']
# value = cell.value
columns = sheet['A']
for col in columns:
print(col.value)
columns = sheet[1]
for row in columns:
print(row.value)
columns = sheet['B:C']
pymongo
import pymongo
# 获取连接
client = pymongo.MongoClient('localhost', 27017)
# 指定数据库
db = client.school
# 获取集合
coll = client.school.student
# 查询
res = coll.find()
for s in res:
print(s)
Mysql
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker, backref
db_url = 'mysql+pymysql://root:root@127.0.0.1:3306/py?charset=utf8mb4'
engine = create_engine(db_url)
Base = declarative_base()
session = sessionmaker(bind=engine)()
class Person(Base):
__tablename__ = 't_person'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
def __str__(self):
return '[Person[id={}, name={}]]'.format(self.id, self.name)
class Card(Base):
__tablename__ = 't_card'
id = Column(Integer, primary_key=True, autoincrement=True)
card_number = Column(String(50))
p_id = Column(Integer, ForeignKey('t_person.id'))
person = relationship('Person', backref=backref('id_card', uselist=False))
def __str__(self):
return 'Card[id = {}, card_number = {}, p_id = {}]'.format(self.id, self.card_number, self.p_id)
def create_table():
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
session.commit()
def add_person():
p1 = Person(name='zs')
p2 = Person(name='ls')
p3 = Person(name='ww')
session.add_all([p1, p2, p3])
session.commit()
def add_card():
c1 = Card(card_number='1001')
c2 = Card(card_number='1002')
c3 = Card(card_number='1003')
session.add_all([c1, c2, c3])
session.commit()
def edit_card():
card = session.query(Card).all()
for i in range(len(card)):
card[i].p_id = str(i + 1)
print(card[i])
session.commit()
def query_person():
p_list = session.query(Person).all()
for p in p_list:
print(p)
if __name__ == '__main__':
# create_table()
# add_person()
# add_card()
# edit_card()
card = session.query(Card).filter(Card.id == '1').first()
print(card)
print(card.person.name)
多线程
1. 锁
threading.Lock()
import threading
import random
import time
# 创建锁
lock = threading.Lock()
g_money=0
class Producer(threading.Thread):
def run(self):
global g_money
for _ in range(10):
lock.acquire()
money = random.randint(1, 10)
g_money += money
print(threading.current_thread().name, '挣了{}钱,当前余额为{}钱'.format(money, g_money))
time.sleep(1)
lock.release()
class Consumer(threading.Thread):
def run(self):
global g_money
for _ in range(10):
lock.acquire()
money = random.randint(1, 10)
if money <= g_money:
g_money -= money
print(threading.current_thread().name, '消费了{}钱,当前余额为{}钱'.format(money, g_money))
else:
print(threading.current_thread().name, '准备消费{}钱,余额不足!当前余额为{}钱'.format(money, g_money))
time.sleep(1)
lock.release()
def start():
for i in range(5):
Producer(name='生产者{}'.format(i)).start()
Consumer(name='消费者{}'.format(i)).start()
if __name__ == '__main__':
start()
threading.Condition()
import threading
import random
import time
# 创建锁
lock = threading.Condition()
g_money = 0
g_time = 0
class Producer(threading.Thread):
def run(self):
global g_money
global g_time
for _ in range(10):
lock.acquire()
money = random.randint(1, 10)
g_money += money
g_time += 1
print(threading.current_thread().name, '挣了{}钱,当前余额为{}钱'.format(money, g_money))
# 生产者每次结束后在release前通知消费者
lock.notify_all()
lock.release()
class Consumer(threading.Thread):
def run(self):
global g_money
global g_time
for _ in range(10):
lock.acquire()
money = random.randint(1, 10)
while money > g_money:
if g_time >= 50:
print('所有生产流程结束,time:{}'.format(g_time))
lock.release()
return
print(threading.current_thread().name,
'-----------------准备消费{}钱,余额不足!当前余额为{}钱--------------'.format(money, g_money))
lock.wait()
g_money -= money
print(threading.current_thread().name, '消费了{}钱,当前余额为{}钱'.format(money, g_money))
lock.release()
def start():
for i in range(5):
Producer(name='生产者{}'.format(i)).start()
for i in range(5):
Consumer(name='消费者{}'.format(i)).start()
if __name__ == '__main__':
start()
queue
import threading
import queue, random
import time
def add_val(q):
while True:
num = random.randint(1, 10)
q.put(num)
print(threading.current_thread().name, '存值:{}'.format(num))
time.sleep(1)
def get_val(q):
while True:
res = q.get()
print(threading.current_thread().name, '取值:{}'.format(res))
time.sleep(1)
def start():
q = queue.Queue(10)
add_thread = threading.Thread(target=add_val, args=(q,))
get_thread = threading.Thread(target=get_val, args=(q,))
add_thread.start()
get_thread.start()
if __name__ == '__main__':
start()
demo1
import json
import os.path
from urllib import parse, request
import requests
# 获取数据
def get_json_obj():
url = 'https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page=0&iOrder=0&iSortNumClose=1&jsoncallback=&iAMSActivityId=51991&_everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&_=1755336231180'
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
'referer': 'https://pvp.qq.com/'
}
response = requests.get(url, headers)
return json.loads(response.text)
# 提取数据
def extract_data(json_obj):
res_data = {}
for data in json_obj['List']:
name = parse.unquote(data['sProdName'])
img_url_lst = []
for i in range(1, 8):
img_url_lst.append(parse.unquote(data['sProdImgNo_' + str(i)]).replace('/200', '/0'))
res_data[name] = img_url_lst
return res_data
def save_jpg(d):
for key in d:
dirpath = os.path.join('E:\Python\爬虫\img', key.strip())
if not os.path.exists(dirpath):
os.mkdir(dirpath)
for index, image_url in enumerate(d[key]):
request.urlretrieve(image_url, os.path.join(dirpath, '{}.jpg').format(index + 1))
print('{}下载完毕'.format(d[key][index]))
if __name__ == '__main__':
json_obj = get_json_obj()
res = extract_data(json_obj)
save_jpg(res)
pyquery
from pyquery import PyQuery as pq
doc = pq(url="http://www.baidu.com", encoding='utf-8')
# print([p for p in dir(doc('a')) if not p.startswith('_') and not p.startswith('__')])
# 查找所有a标签,并获取其属性href
# for i in doc('a').items():
# print(i.attr('href'))
# for i in doc('a').items():
# if i.attr('href').startswith('http'):
# print(i.attr('href'))
# 从文件中读取数据
# doc = pq(filename='demo.html')
# print(doc('title').text())
html = '''
<html>
<head>
<title>PyQuery demo</title>
</head>
<body>
<h1>PyQuery demo</h1>
<div id="main">
<a href="http://www.mashbing.com">马士兵教育</a>
<h1>hello</h1>
</div>
<h2>Python学习</h2>
</body>
</html>
'''
doc = pq(html)
print(doc('#main'))
print(doc('#main').parent())
print(doc('#main').children())
print(doc('#main').siblings())
print(doc('a').attr('href'))
print(doc('#main').html())
print(doc('#main').text())
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小王
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果