import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
import random
from concurrent.futures import ThreadPoolExecutor
import json
class WebScraper:
"""通用的网页爬虫框架"""
def __init__(self, base_url, max_depth=2, max_workers=5):
self.base_url = base_url
self.max_depth = max_depth
self.visited_urls = set()
self.results = []
self.max_workers = max_workers
def is_valid_url(self, url):
"""验证 URL 是否有效"""
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme)
def should_crawl(self, url, depth):
"""判断是否应该爬取该 URL"""
if depth > self.max_depth:
return False
if url in self.visited_urls:
return False
base_domain = urlparse(self.base_url).netloc
url_domain = urlparse(url).netloc
return url_domain == base_domain
def extract_links(self, soup, current_url):
"""从页面中提取所有链接"""
links = set()
for link in soup.find_all('a', href=True):
href = link['href']
absolute_url = urljoin(current_url, href)
if self.is_valid_url(absolute_url):
links.add(absolute_url)
return links
def parse_page(self, soup, url):
"""解析单个页面(子类可以重写此方法)"""
return {
'url': url,
'title': soup.title.string if soup.title else '',
'text': soup.get_text(strip=True)[:500],
'links_count': len(soup.find_all('a'))
}
def crawl_page(self, url, depth=0):
"""爬取单个页面"""
if not self.should_crawl(url, depth):
return set()
self.visited_urls.add(url)
try:
time.sleep(random.uniform(1, 3))
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'lxml')
result = self.parse_page(soup, url)
self.results.append(result)
if depth < self.max_depth:
links = self.extract_links(soup, url)
return links
else:
return set()
except Exception as e:
print(f"爬取 {url} 失败:{str(e)}")
return set()
def crawl(self, start_url=None):
"""开始爬取"""
start_url = start_url or self.base_url
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
urls_to_crawl = {start_url}
depth = 0
while urls_to_crawl and depth <= self.max_depth:
print(f"深度 {depth}: 爬取 {len(urls_to_crawl)} 个 URL")
future_to_url = {
executor.submit(self.crawl_page, url, depth): url
for url in urls_to_crawl
}
new_urls = set()
for future in future_to_url:
try:
links = future.result()
new_urls.update(links)
except Exception as e:
print(f"爬取失败:{str(e)}")
urls_to_crawl = new_urls - self.visited_urls
depth += 1
def save_results(self, filename):
"""保存结果到文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
print(f"结果已保存到 {filename},共 {len(self.results)} 条记录")
from bs4 import BeautifulSoup
import re
from datetime import datetime
class SmartContentExtractor:
"""智能内容提取器"""
@staticmethod
def extract_main_content(soup):
"""提取网页主要内容"""
strategies = [
lambda: soup.find('article'),
lambda: soup.find('div', class_=re.compile(r'content|main|post')),
lambda: max(
soup.find_all(['div', 'section']),
key=lambda x: len(x.get_text(strip=True))
) if soup.find_all(['div', 'section']) else None,
lambda: soup.find(attrs={'itemprop': 'articleBody'}),
]
for strategy in strategies:
try:
element = strategy()
if element and len(element.get_text(strip=True)) > 100:
return element
except:
continue
return soup.body if soup.body else soup
@staticmethod
def extract_metadata(soup):
"""提取页面元数据"""
metadata = {}
og_data = {}
for tag in soup.find_all('meta', property=re.compile(r'^og:')):
og_data[tag['property'][3:]] = tag.get('content', '')
if og_data:
metadata['open_graph'] = og_data
twitter_data = {}
for tag in soup.find_all('meta', attrs={'name': re.compile(r'^twitter:')}):
twitter_data[tag['name'][8:]] = tag.get('content', '')
if twitter_data:
metadata['twitter_card'] = twitter_data
json_ld_scripts = soup.find_all('script', type='application/ld+json')
json_ld_data = []
for script in json_ld_scripts:
try:
import json
data = json.loads(script.string)
json_ld_data.append(data)
except:
continue
if json_ld_data:
metadata['json_ld'] = json_ld_data
pub_date = None
date_patterns = [
r'\d{4}-\d{2}-\d{2}',
r'\d{2}/\d{2}/\d{4}',
r'\d{4}年\d{2}月\d{2}日'
]
for pattern in date_patterns:
date_match = soup.find(text=re.compile(pattern))
if date_match:
pub_date = date_match
break
metadata['publication_date'] = pub_date
return metadata
@staticmethod
def extract_comments(soup):
"""提取评论内容"""
comments = []
comment_selectors = [
'.comment', '.comments', '#comments',
'[class*="comment"]', '[id*="comment"]'
]
for selector in comment_selectors:
comment_elements = soup.select(selector)
for element in comment_elements:
comment_text = element.get_text(strip=True)
if len(comment_text) > 20:
comments.append({
'text': comment_text,
'html': str(element),
'selector': selector
})
return comments
import requests
from bs4 import BeautifulSoup
import json
import time
from datetime import datetime
from dataclasses import dataclass
from typing import List, Dict, Optional
import smtplib
from email.mime.text import MIMEText
import schedule
@dataclass
class Product:
"""产品信息类"""
name: str
price: float
original_price: Optional[float]
url: str
currency: str
timestamp: datetime
website: str
@property
def discount(self) -> Optional[float]:
"""计算折扣"""
if self.original_price and self.original_price > 0:
return round((1 - self.price / self.original_price) * 100, 1)
return None
class PriceMonitor:
"""电商价格监控系统"""
def __init__(self, config_file='config.json'):
self.products = []
self.price_history = {}
self.load_config(config_file)
def load_config(self, config_file):
"""加载配置"""
with open(config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
def scrape_amazon(self, url: str) -> Optional[Product]:
"""抓取亚马逊商品信息"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept-Language': 'en-US,en;q=0.9'
}
try:
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.content, 'lxml')
name_elem = soup.select_one('#productTitle')
name = name_elem.get_text(strip=True) if name_elem else 'Unknown'
price_elem = soup.select_one('.a-price .a-offscreen')
if not price_elem:
price_elem = soup.select_one('#price_inside_buybox')
price_text = price_elem.get_text(strip=True) if price_elem else ''
price = self._parse_price(price_text)
original_price_elem = soup.select_one('.a-text-price .a-offscreen')
original_price_text = original_price_elem.get_text(strip=True) if original_price_elem else ''
original_price = self._parse_price(original_price_text)
currency = self._detect_currency(price_text)
return Product(
name=name,
price=price,
original_price=original_price,
url=url,
currency=currency,
timestamp=datetime.now(),
website='Amazon'
)
except Exception as e:
print(f"抓取亚马逊商品失败:{str(e)}")
return None
def scrape_taobao(self, url: str) -> Optional[Product]:
"""抓取淘宝商品信息"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Cookie': self.config.get('taobao_cookie', '')
}
try:
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.content, 'lxml')
name_elem = soup.select_one('.tb-detail-hd h1')
name = name_elem.get_text(strip=True) if name_elem else 'Unknown'
price_selectors = ['.tm-price', '.tb-rmb-num', '[property="og:product:price"]']
price = None
for selector in price_selectors:
price_elem = soup.select_one(selector)
if price_elem:
price_text = price_elem.get_text(strip=True)
price = self._parse_price(price_text)
if price:
break
return Product(
name=name,
price=price or 0,
original_price=None,
url=url,
currency='CNY',
timestamp=datetime.now(),
website='Taobao'
)
except Exception as e:
print(f"抓取淘宝商品失败:{str(e)}")
return None
def scrape_jd(self, url: str) -> Optional[Product]:
"""抓取京东商品信息"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.content, 'lxml')
name_elem = soup.select_one('.sku-name')
name = name_elem.get_text(strip=True) if name_elem else 'Unknown'
price_elem = soup.select_one('.price .num')
price_text = price_elem.get_text(strip=True) if price_elem else ''
price = self._parse_price(price_text)
original_price_elem = soup.select_one('.price .del')
original_price_text = original_price_elem.get_text(strip=True) if original_price_elem else ''
original_price = self._parse_price(original_price_text)
return Product(
name=name,
price=price or 0,
original_price=original_price,
url=url,
currency='CNY',
timestamp=datetime.now(),
website='JD'
)
except Exception as e:
print(f"抓取京东商品失败:{str(e)}")
return None
def _parse_price(self, price_text: str) -> Optional[float]:
"""解析价格文本"""
import re
if not price_text:
return None
match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
if match:
try:
return float(match.group())
except ValueError:
return None
return None
def _detect_currency(self, price_text: str) -> str:
"""检测货币类型"""
currency_symbols = {
'$': 'USD', '€': 'EUR', '£': 'GBP', '¥': 'CNY', '₹': 'INR'
}
for symbol, currency in currency_symbols.items():
if symbol in price_text:
return currency
return 'USD'
def check_price_drop(self, product: Product, threshold: float = 10.0) -> bool:
"""检查价格是否下降超过阈值"""
product_key = f"{product.website}_{product.url}"
if product_key in self.price_history:
old_price = self.price_history[product_key]
price_drop = ((old_price - product.price) / old_price) * 100
if price_drop >= threshold:
return True
self.price_history[product_key] = product.price
return False
def send_price_alert(self, product: Product, old_price: float):
"""发送价格提醒"""
email_config = self.config.get('email', {})
if not email_config.get('enabled', False):
return
try:
price_drop = ((old_price - product.price) / old_price) * 100
subject = f"💰 价格提醒:{product.name} 降价 {price_drop:.1f}%"
message = f"""
商品名称:{product.name}
当前价格:{product.currency} {product.price:.2f}
之前价格:{product.currency} {old_price:.2f}
降价幅度:{price_drop:.1f}%
商品链接:{product.url}
监控时间:{product.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
"""
msg = MIMEText(message)
msg['Subject'] = subject
msg['From'] = email_config['from']
msg['To'] = ', '.join(email_config['to'])
with smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port']) as server:
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
print(f"价格提醒已发送:{product.name}")
except Exception as e:
print(f"发送提醒失败:{str(e)}")
def monitor_products(self):
"""监控所有产品"""
print(f"开始监控 {len(self.config['products'])} 个商品...")
for product_config in self.config['products']:
url = product_config['url']
website = product_config.get('website', 'auto')
threshold = product_config.get('threshold', 10.0)
if website == 'auto':
if 'amazon.' in url:
website = 'amazon'
elif 'taobao.' in url or 'tmall.' in url:
website = 'taobao'
elif 'jd.' in url:
website = 'jd'
product = None
if website == 'amazon':
product = self.scrape_amazon(url)
elif website == 'taobao':
product = self.scrape_taobao(url)
elif website == 'jd':
product = self.scrape_jd(url)
if product:
self.products.append(product)
product_key = f"{website}_{url}"
if product_key in self.price_history:
old_price = self.price_history[product_key]
if self.check_price_drop(product, threshold):
self.send_price_alert(product, old_price)
self.price_history[product_key] = product.price
discount_info = f" (折扣:{product.discount}%)" if product.discount else ""
print(f"{product.website}: {product.name} - {product.currency} {product.price:.2f}{discount_info}")
time.sleep(2)
def run_scheduled_monitoring(self):
"""运行定时监控"""
print("价格监控系统启动...")
schedule.every().hour.do(self.monitor_products)
self.monitor_products()
while True:
schedule.run_pending()
time.sleep(60)
"""
{
"email": {
"enabled": false,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "[email protected]",
"password": "your_password",
"from": "[email protected]",
"to": ["[email protected]"]
},
"taobao_cookie": "your_taobao_cookie_here",
"products": [
{
"name": "示例商品 1",
"url": "https://www.amazon.com/dp/B08N5WRWNW",
"website": "amazon",
"threshold": 10.0
},
{
"name": "示例商品 2",
"url": "https://item.taobao.com/item.htm?id=123456789",
"website": "taobao",
"threshold": 15.0
}
]
}
"""
if __name__ == "__main__":
monitor = PriceMonitor('config.json')
monitor.monitor_products()