沃尔玛爬虫完整构建指南:用Python打造高效商品数据采集系统

沃尔玛爬虫(Walmart Scraper)作为电商数据采集的重要工具,能够帮助卖家、分析师和开发者自动获取沃尔玛平台的商品信息、价格数据和市场趋势。在竞争激烈的电商环境中,掌握实时的商品数据对于制定营销策略、价格优化和竞品分析至关重要。本文将详细介绍如何使用Python构建一个功能完善的沃尔玛爬虫系统,涵盖从基础设置到高级优化的全过程。
科技感笔记本电脑屏幕显示 Python 代码块,包含 “沃尔玛爬虫” 关键词,旁边有沃尔玛蓝黄色标志,背景为蓝色渐变,体现电商数据采集工具设计。

沃尔玛爬虫(Walmart Scraper)作为电商数据采集的重要工具,能够帮助卖家、分析师和开发者自动获取沃尔玛平台的商品信息、价格数据和市场趋势。在竞争激烈的电商环境中,掌握实时的商品数据对于制定营销策略、价格优化和竞品分析至关重要。本文将详细介绍如何使用Python构建一个功能完善的沃尔玛爬虫系统,涵盖从基础设置到高级优化的全过程。

为什么需要构建沃尔玛爬虫

在深入技术实现之前,我们先了解构建沃尔玛爬虫的核心价值。沃尔玛作为全球最大的零售商之一,其平台上包含数百万种商品,价格变化频繁,促销活动不断。对于电商从业者而言,及时获取这些数据能够:

  • 市场趋势分析:了解热销商品和消费者偏好
  • 竞品价格监控:实时跟踪竞争对手的价格策略
  • 库存管理优化:基于供需数据调整采购计划
  • 营销策略制定:根据促销信息制定相应策略

然而,手动收集这些数据不仅效率低下,而且容易出错。这就是Python沃尔玛数据抓取(Python Walmart Data Scraping)技术发挥作用的地方。

技术准备与环境搭建

1. 开发环境配置

首先确保您的系统已安装Python 3.7或更高版本。我们将使用以下核心库来构建我们的沃尔玛商品信息采集器(Walmart Product Information Crawler):

# requirements.txt
requests==2.31.0
beautifulsoup4==4.12.2
selenium==4.15.0
pandas==2.1.3
fake-useragent==1.4.0
python-dotenv==1.0.0

安装依赖:

pip install -r requirements.txt

2. 基础项目结构

walmart_scraper/
├── config/
│   ├── __init__.py
│   └── settings.py
├── scrapers/
│   ├── __init__.py
│   ├── base_scraper.py
│   └── walmart_scraper.py
├── utils/
│   ├── __init__.py
│   ├── proxy_handler.py
│   └── data_processor.py
├── data/
│   └── output/
├── main.py
└── requirements.txt

核心爬虫组件开发

1. 基础爬虫类设计

让我们从创建一个基础的爬虫类开始:

# scrapers/base_scraper.py
import requests
import time
import random
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import logging

class BaseScraper:
    def __init__(self):
        self.session = requests.Session()
        self.ua = UserAgent()
        self.setup_logging()
        
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('scraper.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def get_headers(self):
        """生成随机请求头"""
        return {
            'User-Agent': self.ua.random,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
    
    def random_delay(self, min_delay=1, max_delay=3):
        """随机延迟防止被识别"""
        delay = random.uniform(min_delay, max_delay)
        time.sleep(delay)
    
    def make_request(self, url, max_retries=3):
        """发送HTTP请求with重试机制"""
        for attempt in range(max_retries):
            try:
                headers = self.get_headers()
                response = self.session.get(url, headers=headers, timeout=10)
                response.raise_for_status()
                return response
            except requests.RequestException as e:
                self.logger.warning(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    self.random_delay(2, 5)
                else:
                    self.logger.error(f"所有请求尝试失败: {url}")
                    raise

2. 沃尔玛专用爬虫实现

接下来实现专门针对沃尔玛的爬虫类:

# scrapers/walmart_scraper.py
from .base_scraper import BaseScraper
from bs4 import BeautifulSoup
import json
import re
from urllib.parse import urljoin, urlparse, parse_qs

class WalmartScraper(BaseScraper):
    def __init__(self):
        super().__init__()
        self.base_url = "https://www.walmart.com"
        
    def search_products(self, keyword, page=1, max_results=50):
        """搜索商品列表"""
        search_url = f"{self.base_url}/search?q={keyword}&page={page}"
        
        try:
            response = self.make_request(search_url)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # 提取商品列表
            products = self.extract_product_list(soup)
            self.logger.info(f"成功提取 {len(products)} 个商品信息")
            
            return products[:max_results]
            
        except Exception as e:
            self.logger.error(f"搜索商品失败: {e}")
            return []
    
    def extract_product_list(self, soup):
        """从搜索结果页面提取商品信息"""
        products = []
        
        # 查找商品容器
        product_containers = soup.find_all('div', {'data-automation-id': 'product-tile'})
        
        for container in product_containers:
            try:
                product_data = self.extract_single_product(container)
                if product_data:
                    products.append(product_data)
            except Exception as e:
                self.logger.warning(f"提取单个商品失败: {e}")
                continue
                
        return products
    
    def extract_single_product(self, container):
        """提取单个商品的详细信息"""
        product = {}
        
        try:
            # 商品标题
            title_elem = container.find('span', {'data-automation-id': 'product-title'})
            product['title'] = title_elem.get_text(strip=True) if title_elem else ''
            
            # 价格信息
            price_elem = container.find('div', {'data-automation-id': 'product-price'})
            if price_elem:
                price_text = price_elem.get_text(strip=True)
                product['price'] = self.clean_price(price_text)
            
            # 商品链接
            link_elem = container.find('a', href=True)
            if link_elem:
                product['url'] = urljoin(self.base_url, link_elem['href'])
                # 从URL中提取商品ID
                product['product_id'] = self.extract_product_id(product['url'])
            
            # 评分信息
            rating_elem = container.find('span', class_=re.compile(r'.*rating.*'))
            if rating_elem:
                rating_text = rating_elem.get('aria-label', '')
                product['rating'] = self.extract_rating(rating_text)
            
            # 图片
            img_elem = container.find('img')
            if img_elem:
                product['image_url'] = img_elem.get('src', '')
            
            # 供应商信息
            seller_elem = container.find('span', string=re.compile(r'Sold by'))
            if seller_elem:
                product['seller'] = seller_elem.get_text(strip=True)
            
            return product if product.get('title') else None
            
        except Exception as e:
            self.logger.warning(f"解析商品数据失败: {e}")
            return None
    
    def get_product_details(self, product_url):
        """获取商品详细页面信息"""
        try:
            response = self.make_request(product_url)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            details = {}
            
            # 从script标签中提取JSON数据
            script_tags = soup.find_all('script', {'type': 'application/ld+json'})
            for script in script_tags:
                try:
                    json_data = json.loads(script.string)
                    if '@type' in json_data and json_data['@type'] == 'Product':
                        details.update(self.parse_product_json(json_data))
                        break
                except json.JSONDecodeError:
                    continue
            
            # 商品描述
            desc_elem = soup.find('div', {'data-automation-id': 'product-highlights'})
            if desc_elem:
                details['description'] = desc_elem.get_text(strip=True)
            
            # 库存状态
            stock_elem = soup.find('div', {'data-automation-id': 'fulfillment-section'})
            if stock_elem:
                details['in_stock'] = 'in stock' in stock_elem.get_text().lower()
            
            return details
            
        except Exception as e:
            self.logger.error(f"获取商品详情失败: {e}")
            return {}
    
    def clean_price(self, price_text):
        """清理价格文本"""
        if not price_text:
            return None
        
        # 提取数字和小数点
        price_match = re.search(r'\$?(\d+\.?\d*)', price_text.replace(',', ''))
        return float(price_match.group(1)) if price_match else None
    
    def extract_product_id(self, url):
        """从URL中提取商品ID"""
        try:
            parsed_url = urlparse(url)
            path_parts = parsed_url.path.split('/')
            for part in path_parts:
                if part.isdigit():
                    return part
        except:
            pass
        return None
    
    def extract_rating(self, rating_text):
        """提取评分数值"""
        rating_match = re.search(r'(\d+\.?\d*)', rating_text)
        return float(rating_match.group(1)) if rating_match else None
    
    def parse_product_json(self, json_data):
        """解析产品JSON数据"""
        details = {}
        
        if 'name' in json_data:
            details['full_name'] = json_data['name']
        
        if 'offers' in json_data:
            offer = json_data['offers']
            if isinstance(offer, list):
                offer = offer[0]
            
            details['availability'] = offer.get('availability', '')
            details['currency'] = offer.get('priceCurrency', 'USD')
            
            if 'price' in offer:
                details['detailed_price'] = float(offer['price'])
        
        if 'aggregateRating' in json_data:
            rating_data = json_data['aggregateRating']
            details['average_rating'] = float(rating_data.get('ratingValue', 0))
            details['review_count'] = int(rating_data.get('reviewCount', 0))
        
        return details

应对反爬虫策略

1. IP代理池集成

现代电商网站都部署了先进的反爬虫系统。为了构建稳定的自动化沃尔玛爬虫系统(Automated Walmart Scraping System),我们需要集成IP代理池:

# utils/proxy_handler.py
import requests
import random
import threading
from queue import Queue
import time

class ProxyHandler:
    def __init__(self, proxy_list=None):
        self.proxy_queue = Queue()
        self.failed_proxies = set()
        self.proxy_stats = {}
        self.lock = threading.Lock()
        
        if proxy_list:
            self.load_proxies(proxy_list)
    
    def load_proxies(self, proxy_list):
        """加载代理列表"""
        for proxy in proxy_list:
            self.proxy_queue.put(proxy)
            self.proxy_stats[proxy] = {'success': 0, 'failed': 0}
    
    def get_proxy(self):
        """获取可用代理"""
        with self.lock:
            while not self.proxy_queue.empty():
                proxy = self.proxy_queue.get()
                if proxy not in self.failed_proxies:
                    return proxy
        return None
    
    def test_proxy(self, proxy, test_url="http://httpbin.org/ip"):
        """测试代理是否可用"""
        try:
            proxies = {
                'http': f'http://{proxy}',
                'https': f'https://{proxy}'
            }
            
            response = requests.get(
                test_url, 
                proxies=proxies, 
                timeout=10,
                headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
            )
            
            if response.status_code == 200:
                self.mark_proxy_success(proxy)
                return True
                
        except requests.RequestException:
            pass
        
        self.mark_proxy_failed(proxy)
        return False
    
    def mark_proxy_success(self, proxy):
        """标记代理成功"""
        with self.lock:
            if proxy in self.proxy_stats:
                self.proxy_stats[proxy]['success'] += 1
            # 成功的代理重新放回队列
            self.proxy_queue.put(proxy)
    
    def mark_proxy_failed(self, proxy):
        """标记代理失败"""
        with self.lock:
            if proxy in self.proxy_stats:
                self.proxy_stats[proxy]['failed'] += 1
            
            # 失败次数过多的代理加入黑名单
            if self.proxy_stats[proxy]['failed'] > 3:
                self.failed_proxies.add(proxy)

# 集成代理的爬虫类
class WalmartScraperWithProxy(WalmartScraper):
    def __init__(self, proxy_list=None):
        super().__init__()
        self.proxy_handler = ProxyHandler(proxy_list) if proxy_list else None
    
    def make_request_with_proxy(self, url, max_retries=3):
        """使用代理发送请求"""
        for attempt in range(max_retries):
            proxy = self.proxy_handler.get_proxy() if self.proxy_handler else None
            
            try:
                headers = self.get_headers()
                proxies = None
                
                if proxy:
                    proxies = {
                        'http': f'http://{proxy}',
                        'https': f'https://{proxy}'
                    }
                
                response = self.session.get(
                    url, 
                    headers=headers, 
                    proxies=proxies,
                    timeout=15
                )
                response.raise_for_status()
                
                if proxy and self.proxy_handler:
                    self.proxy_handler.mark_proxy_success(proxy)
                
                return response
                
            except requests.RequestException as e:
                if proxy and self.proxy_handler:
                    self.proxy_handler.mark_proxy_failed(proxy)
                
                self.logger.warning(f"代理请求失败 {proxy}: {e}")
                self.random_delay(3, 7)
        
        raise Exception(f"所有代理请求都失败: {url}")

2. 验证码识别与处理

沃尔玛网站可能会出现验证码挑战。我们需要集成验证码识别服务:

# utils/captcha_solver.py
import base64
import requests
from PIL import Image
import io

class CaptchaSolver:
    def __init__(self, api_key=None, service='2captcha'):
        self.api_key = api_key
        self.service = service
        self.base_url = 'http://2captcha.com' if service == '2captcha' else None
    
    def solve_image_captcha(self, image_data):
        """解决图片验证码"""
        if not self.api_key:
            self.logger.warning("未配置验证码服务API密钥")
            return None
        
        try:
            # 提交验证码
            submit_url = f"{self.base_url}/in.php"
            
            files = {'file': ('captcha.png', image_data, 'image/png')}
            data = {
                'key': self.api_key,
                'method': 'post'
            }
            
            response = requests.post(submit_url, files=files, data=data)
            result = response.text
            
            if 'OK|' in result:
                captcha_id = result.split('|')[1]
                return self.get_captcha_result(captcha_id)
            
        except Exception as e:
            self.logger.error(f"验证码识别失败: {e}")
        
        return None
    
    def get_captcha_result(self, captcha_id, max_wait=120):
        """获取验证码识别结果"""
        result_url = f"{self.base_url}/res.php"
        
        for _ in range(max_wait // 5):
            try:
                response = requests.get(result_url, params={
                    'key': self.api_key,
                    'action': 'get',
                    'id': captcha_id
                })
                
                result = response.text
                
                if result == 'CAPCHA_NOT_READY':
                    time.sleep(5)
                    continue
                elif 'OK|' in result:
                    return result.split('|')[1]
                else:
                    break
                    
            except Exception as e:
                self.logger.error(f"获取验证码结果失败: {e}")
                break
        
        return None

数据处理与存储

1. 数据清洗和标准化

# utils/data_processor.py
import pandas as pd
import re
from datetime import datetime
import json

class DataProcessor:
    def __init__(self):
        self.cleaned_data = []
    
    def clean_product_data(self, raw_products):
        """清洗商品数据"""
        cleaned_products = []
        
        for product in raw_products:
            cleaned_product = {}
            
            # 标题清洗
            title = product.get('title', '').strip()
            cleaned_product['title'] = self.clean_title(title)
            
            # 价格标准化
            price = product.get('price')
            cleaned_product['price_usd'] = self.standardize_price(price)
            
            # URL标准化
            url = product.get('url', '')
            cleaned_product['product_url'] = self.clean_url(url)
            
            # 评分标准化
            rating = product.get('rating')
            cleaned_product['rating_score'] = self.standardize_rating(rating)
            
            # 添加时间戳
            cleaned_product['scraped_at'] = datetime.now().isoformat()
            
            # 商品ID
            cleaned_product['product_id'] = product.get('product_id', '')
            
            # 图片URL
            cleaned_product['image_url'] = product.get('image_url', '')
            
            # 供应商
            cleaned_product['seller'] = product.get('seller', 'Walmart')
            
            if cleaned_product['title']:  # 只保留有标题的商品
                cleaned_products.append(cleaned_product)
        
        return cleaned_products
    
    def clean_title(self, title):
        """清洗商品标题"""
        if not title:
            return ''
        
        # 移除多余空白字符
        title = re.sub(r'\s+', ' ', title).strip()
        
        # 移除特殊字符但保留基本标点
        title = re.sub(r'[^\w\s\-\(\)\[\]&,.]', '', title)
        
        return title[:200]  # 限制长度
    
    def standardize_price(self, price):
        """标准化价格"""
        if price is None:
            return None
        
        if isinstance(price, str):
            # 移除货币符号和逗号
            price_clean = re.sub(r'[$,]', '', price)
            try:
                return float(price_clean)
            except ValueError:
                return None
        
        return float(price) if price else None
    
    def clean_url(self, url):
        """清洗URL"""
        if not url:
            return ''
        
        # 移除追踪参数
        if '?' in url:
            base_url = url.split('?')[0]
            return base_url
        
        return url
    
    def standardize_rating(self, rating):
        """标准化评分"""
        if rating is None:
            return None
        
        try:
            rating_float = float(rating)
            # 确保评分在0-5范围内
            return max(0, min(5, rating_float))
        except (ValueError, TypeError):
            return None
    
    def save_to_excel(self, products, filename):
        """保存到Excel文件"""
        if not products:
            self.logger.warning("没有数据要保存")
            return
        
        df = pd.DataFrame(products)
        
        # 重新排序列
        column_order = [
            'product_id', 'title', 'price_usd', 'rating_score', 
            'seller', 'product_url', 'image_url', 'scraped_at'
        ]
        
        df = df.reindex(columns=column_order)
        
        # 保存到Excel
        with pd.ExcelWriter(filename, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name='Products', index=False)
            
            # 添加统计信息
            stats_df = pd.DataFrame({
                '统计项': ['总商品数', '平均价格', '最高价格', '最低价格', '平均评分'],
                '数值': [
                    len(df),
                    df['price_usd'].mean() if df['price_usd'].notna().any() else 0,
                    df['price_usd'].max() if df['price_usd'].notna().any() else 0,
                    df['price_usd'].min() if df['price_usd'].notna().any() else 0,
                    df['rating_score'].mean() if df['rating_score'].notna().any() else 0
                ]
            })
            stats_df.to_excel(writer, sheet_name='Statistics', index=False)
        
        print(f"数据已保存到 {filename}")
    
    def save_to_json(self, products, filename):
        """保存到JSON文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(products, f, ensure_ascii=False, indent=2)
        
        print(f"JSON数据已保存到 {filename}")

2. 完整的主程序实现

现在让我们把所有组件整合到一个完整的沃尔玛商品列表抓取工具(Walmart Product List Scraping Tool)中:

# main.py
import argparse
import sys
import os
from datetime import datetime
from scrapers.walmart_scraper import WalmartScraperWithProxy
from utils.data_processor import DataProcessor
from utils.captcha_solver import CaptchaSolver
import logging

class WalmartScrapingManager:
    def __init__(self, proxy_list=None, captcha_api_key=None):
        self.scraper = WalmartScraperWithProxy(proxy_list)
        self.data_processor = DataProcessor()
        self.captcha_solver = CaptchaSolver(captcha_api_key) if captcha_api_key else None
        self.logger = logging.getLogger(__name__)
    
    def scrape_products(self, keywords, max_products_per_keyword=50, output_format='excel'):
        """批量抓取商品数据"""
        all_products = []
        
        for keyword in keywords:
            self.logger.info(f"开始抓取关键词: {keyword}")
            
            try:
                # 搜索商品列表
                products = self.scraper.search_products(
                    keyword=keyword,
                    max_results=max_products_per_keyword
                )
                
                # 获取详细信息
                detailed_products = []
                for i, product in enumerate(products):
                    if product.get('url'):
                        try:
                            details = self.scraper.get_product_details(product['url'])
                            product.update(details)
                            detailed_products.append(product)
                            
                            # 添加关键词标签
                            product['search_keyword'] = keyword
                            
                            self.logger.info(f"已处理 {i+1}/{len(products)} 个商品")
                            
                            # 随机延迟
                            self.scraper.random_delay(1, 3)
                            
                        except Exception as e:
                            self.logger.warning(f"获取商品详情失败: {e}")
                            continue
                
                all_products.extend(detailed_products)
                self.logger.info(f"关键词 '{keyword}' 抓取完成,获得 {len(detailed_products)} 个商品")
                
            except Exception as e:
                self.logger.error(f"抓取关键词 '{keyword}' 失败: {e}")
                continue
        
        # 数据清洗
        cleaned_products = self.data_processor.clean_product_data(all_products)
        
        # 保存数据
        self.save_results(cleaned_products, output_format)
        
        return cleaned_products
    
    def save_results(self, products, output_format):
        """保存抓取结果"""
        if not products:
            self.logger.warning("没有数据需要保存")
            return
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        if output_format.lower() == 'excel':
            filename = f"data/output/walmart_products_{timestamp}.xlsx"
            self.data_processor.save_to_excel(products, filename)
        elif output_format.lower() == 'json':
            filename = f"data/output/walmart_products_{timestamp}.json"
            self.data_processor.save_to_json(products, filename)
        else:
            # 同时保存两种格式
            excel_filename = f"data/output/walmart_products_{timestamp}.xlsx"
            json_filename = f"data/output/walmart_products_{timestamp}.json"
            self.data_processor.save_to_excel(products, excel_filename)
            self.data_processor.save_to_json(products, json_filename)

def main():
    parser = argparse.ArgumentParser(description='沃尔玛商品数据抓取工具')
    parser.add_argument('--keywords', nargs='+', required=True, help='搜索关键词列表')
    parser.add_argument('--max-products', type=int, default=50, help='每个关键词最大抓取商品数')
    parser.add_argument('--output-format', choices=['excel', 'json', 'both'], default='excel', help='输出格式')
    parser.add_argument('--proxy-file', help='代理列表文件路径')
    parser.add_argument('--captcha-api-key', help='验证码识别服务API密钥')
    
    args = parser.parse_args()
    
    # 确保输出目录存在
    os.makedirs('data/output', exist_ok=True)
    
    # 加载代理列表
    proxy_list = None
    if args.proxy_file and os.path.exists(args.proxy_file):
        with open(args.proxy_file, 'r') as f:
            proxy_list = [line.strip() for line in f if line.strip()]
    
    # 创建爬虫管理器
    scraper_manager = WalmartScrapingManager(
        proxy_list=proxy_list,
        captcha_api_key=args.captcha_api_key
    )
    
    # 开始抓取
    try:
        products = scraper_manager.scrape_products(
            keywords=args.keywords,
            max_products_per_keyword=args.max_products,
            output_format=args.output_format
        )
        
        print(f"\n抓取完成!总共获得 {len(products)} 个商品数据")
        
        # 显示统计信息
        if products:
            prices = [p['price_usd'] for p in products if p.get('price_usd')]
            ratings = [p['rating_score'] for p in products if p.get('rating_score')]
            
            print(f"价格统计: 平均 ${sum(prices)/len(prices):.2f}" if prices else "无价格数据")
            print(f"评分统计: 平均 {sum(ratings)/len(ratings):.2f}" if ratings else "无评分数据")
    
    except KeyboardInterrupt:
        print("\n用户中断抓取过程")
    except Exception as e:
        print(f"抓取过程出现错误: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

常见挑战与解决方案

1. 动态内容加载

现代电商网站大量使用JavaScript动态加载内容。对于这种情况,我们需要使用Selenium来处理:

# scrapers/selenium_scraper.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import undetected_chromedriver as uc

class SeleniumWalmartScraper:
    def __init__(self, headless=True, proxy=None):
        self.setup_driver(headless, proxy)
        
    def setup_driver(self, headless=True, proxy=None):
        """配置浏览器驱动"""
        options = uc.ChromeOptions()
        
        if headless:
            options.add_argument('--headless')
        
        # 反检测设置
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--disable-blink-features=AutomationControlled')
        options.add_experimental_option("excludeSwitches", ["enable-automation"])
        options.add_experimental_option('useAutomationExtension', False)
        
        # 代理设置
        if proxy:
            options.add_argument(f'--proxy-server={proxy}')
        
        # 用户代理
        options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
        
        self.driver = uc.Chrome(options=options)
        
        # 执行反检测脚本
        self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
    
    def scrape_with_javascript(self, url, wait_selector=None):
        """使用Selenium抓取动态内容"""
        try:
            self.driver.get(url)
            
            # 等待特定元素加载
            if wait_selector:
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))
                )
            
            # 滚动页面触发懒加载
            self.scroll_page()
            
            # 获取页面源码
            html_content = self.driver.page_source
            return html_content
            
        except Exception as e:
            print(f"Selenium抓取失败: {e}")
            return None
    
    def scroll_page(self):
        """滚动页面以触发懒加载"""
        last_height = self.driver.execute_script("return document.body.scrollHeight")
        
        while True:
            # 滚动到页面底部
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            
            # 等待新内容加载
            time.sleep(2)
            
            # 计算新的页面高度
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            
            if new_height == last_height:
                break
            
            last_height = new_height
    
    def close(self):
        """关闭浏览器"""
        if hasattr(self, 'driver'):
            self.driver.quit()

2. 分布式爬虫架构

对于大规模数据抓取,我们可以实现分布式爬虫:

# distributed/task_manager.py
import redis
import json
import uuid
from datetime import datetime, timedelta

class TaskManager:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
        self.task_queue = 'walmart_scrape_tasks'
        self.result_queue = 'walmart_scrape_results'
        
    def add_task(self, keyword, max_products=50, priority=1):
        """添加抓取任务"""
        task_id = str(uuid.uuid4())
        task_data = {
            'task_id': task_id,
            'keyword': keyword,
            'max_products': max_products,
            'priority': priority,
            'created_at': datetime.now().isoformat(),
            'status': 'pending'
        }
        
        # 使用优先级队列
        self.redis_client.zadd(self.task_queue, {json.dumps(task_data): priority})
        return task_id
    
    def get_task(self):
        """获取待处理任务"""
        # 获取最高优先级任务
        task_data = self.redis_client.zpopmax(self.task_queue)
        
        if task_data:
            task_json = task_data[0][0].decode('utf-8')
            return json.loads(task_json)
        
        return None
    
    def save_result(self, task_id, products, status='completed'):
        """保存抓取结果"""
        result_data = {
            'task_id': task_id,
            'products': products,
            'status': status,
            'completed_at': datetime.now().isoformat(),
            'product_count': len(products)
        }
        
        self.redis_client.lpush(self.result_queue, json.dumps(result_data))
    
    def get_results(self, limit=10):
        """获取抓取结果"""
        results = []
        for _ in range(limit):
            result_data = self.redis_client.rpop(self.result_queue)
            if result_data:
                results.append(json.loads(result_data.decode('utf-8')))
            else:
                break
        
        return results

# distributed/worker.py
import time
import logging
from task_manager import TaskManager
from scrapers.walmart_scraper import WalmartScraperWithProxy

class ScrapingWorker:
    def __init__(self, worker_id, proxy_list=None):
        self.worker_id = worker_id
        self.task_manager = TaskManager()
        self.scraper = WalmartScraperWithProxy(proxy_list)
        self.logger = logging.getLogger(f'Worker-{worker_id}')
        
    def run(self):
        """工作进程主循环"""
        self.logger.info(f"工作进程 {self.worker_id} 启动")
        
        while True:
            try:
                # 获取任务
                task = self.task_manager.get_task()
                
                if task:
                    self.logger.info(f"处理任务: {task['task_id']}")
                    self.process_task(task)
                else:
                    # 没有任务时休眠
                    time.sleep(5)
                    
            except KeyboardInterrupt:
                self.logger.info("工作进程停止")
                break
            except Exception as e:
                self.logger.error(f"工作进程异常: {e}")
                time.sleep(10)
    
    def process_task(self, task):
        """处理单个抓取任务"""
        try:
            keyword = task['keyword']
            max_products = task['max_products']
            
            # 执行抓取
            products = self.scraper.search_products(keyword, max_results=max_products)
            
            # 保存结果
            self.task_manager.save_result(
                task['task_id'], 
                products, 
                'completed'
            )
            
            self.logger.info(f"任务 {task['task_id']} 完成,抓取 {len(products)} 个商品")
            
        except Exception as e:
            self.logger.error(f"任务处理失败: {e}")
            self.task_manager.save_result(
                task['task_id'], 
                [], 
                'failed'
            )

3. 监控和告警系统

# monitoring/scraper_monitor.py
import psutil
import time
import smtplib
from email.mime.text import MimeText
from datetime import datetime, timedelta

class ScraperMonitor:
    def __init__(self, email_config=None):
        self.email_config = email_config
        self.performance_log = []
        
    def monitor_performance(self):
        """监控系统性能"""
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_percent = psutil.virtual_memory().percent
        disk_percent = psutil.disk_usage('/').percent
        
        performance_data = {
            'timestamp': datetime.now(),
            'cpu_percent': cpu_percent,
            'memory_percent': memory_percent,
            'disk_percent': disk_percent
        }
        
        self.performance_log.append(performance_data)
        
        # 检查是否需要告警
        if cpu_percent > 80 or memory_percent > 80:
            self.send_alert(f"系统资源使用率过高: CPU {cpu_percent}%, 内存 {memory_percent}%")
        
        return performance_data
    
    def send_alert(self, message):
        """发送告警邮件"""
        if not self.email_config:
            print(f"告警: {message}")
            return
        
        try:
            msg = MimeText(f"沃尔玛爬虫系统告警\n\n{message}\n\n时间: {datetime.now()}")
            msg['Subject'] = '爬虫系统告警'
            msg['From'] = self.email_config['from']
            msg['To'] = self.email_config['to']
            
            server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
            server.starttls()
            server.login(self.email_config['username'], self.email_config['password'])
            server.send_message(msg)
            server.quit()
            
            print(f"告警邮件已发送: {message}")
            
        except Exception as e:
            print(f"发送告警邮件失败: {e}")

高级优化技巧

1. 智能重试机制

# utils/retry_handler.py
import time
import random
from functools import wraps

def smart_retry(max_retries=3, base_delay=1, backoff_factor=2, jitter=True):
    """智能重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    
                    if attempt < max_retries - 1:
                        # 计算延迟时间
                        delay = base_delay * (backoff_factor ** attempt)
                        
                        # 添加随机抖动
                        if jitter:
                            delay += random.uniform(0, delay * 0.1)
                        
                        print(f"重试 {attempt + 1}/{max_retries},{delay:.2f}秒后重试")
                        time.sleep(delay)
                    else:
                        print(f"所有重试都失败,最后异常: {e}")
            
            raise last_exception
        
        return wrapper
    return decorator

2. 数据去重和缓存

# utils/cache_manager.py
import hashlib
import json
import os
from datetime import datetime, timedelta

class CacheManager:
    def __init__(self, cache_dir='cache', expire_hours=24):
        self.cache_dir = cache_dir
        self.expire_hours = expire_hours
        os.makedirs(cache_dir, exist_ok=True)
    
    def get_cache_key(self, url):
        """生成缓存键"""
        return hashlib.md5(url.encode()).hexdigest()
    
    def get_cache_file(self, cache_key):
        """获取缓存文件路径"""
        return os.path.join(self.cache_dir, f"{cache_key}.json")
    
    def is_cache_valid(self, cache_file):
        """检查缓存是否有效"""
        if not os.path.exists(cache_file):
            return False
        
        file_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
        expire_time = datetime.now() - timedelta(hours=self.expire_hours)
        
        return file_time > expire_time
    
    def get_cached_data(self, url):
        """获取缓存数据"""
        cache_key = self.get_cache_key(url)
        cache_file = self.get_cache_file(cache_key)
        
        if self.is_cache_valid(cache_file):
            try:
                with open(cache_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except Exception:
                pass
        
        return None
    
    def save_to_cache(self, url, data):
        """保存数据到缓存"""
        cache_key = self.get_cache_key(url)
        cache_file = self.get_cache_file(cache_key)
        
        try:
            with open(cache_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"保存缓存失败: {e}")

class DataDeduplicator:
    def __init__(self):
        self.seen_products = set()
    
    def is_duplicate(self, product):
        """检查商品是否重复"""
        # 使用商品ID和标题创建唯一标识
        identifier = f"{product.get('product_id', '')}-{product.get('title', '')}"
        identifier_hash = hashlib.md5(identifier.encode()).hexdigest()
        
        if identifier_hash in self.seen_products:
            return True
        
        self.seen_products.add(identifier_hash)
        return False
    
    def deduplicate_products(self, products):
        """去重商品列表"""
        unique_products = []
        
        for product in products:
            if not self.is_duplicate(product):
                unique_products.append(product)
        
        print(f"去重前: {len(products)} 个商品,去重后: {len(unique_products)} 个商品")
        return unique_products

性能优化与扩展

1. 异步并发处理

# async_scraper.py
import asyncio
import aiohttp
from aiohttp import ClientTimeout
import async_timeout

class AsyncWalmartScraper:
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
    async def fetch_page(self, session, url):
        """异步获取页面"""
        async with self.semaphore:
            try:
                timeout = ClientTimeout(total=30)
                async with session.get(url, timeout=timeout) as response:
                    if response.status == 200:
                        return await response.text()
                    else:
                        print(f"HTTP错误 {response.status}: {url}")
            except Exception as e:
                print(f"请求失败: {e}")
            
            return None
    
    async def scrape_multiple_urls(self, urls):
        """并发抓取多个URL"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_page(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 过滤成功的结果
            successful_results = [r for r in results if isinstance(r, str)]
            print(f"成功抓取 {len(successful_results)}/{len(urls)} 个页面")
            
            return successful_results

实际应用场景示例

使用示例

# 基本使用
python main.py --keywords "wireless headphones" "bluetooth speaker" --max-products 30

# 使用代理
python main.py --keywords "laptop" --proxy-file proxies.txt --output-format both

# 大批量抓取
python main.py --keywords "electronics" "home garden" "sports" --max-products 100 --output-format json

配置代理文件示例 (proxies.txt)

192.168.1.100:8080
203.123.45.67:3128
104.248.63.15:30588
167.172.180.46:41258

为什么选择专业的API服务

虽然我们已经详细介绍了如何构建一个功能完善的沃尔玛爬虫系统,但在实际业务应用中,构建和维护自己的爬虫系统面临诸多挑战:

技术维护成本高:电商网站频繁更新反爬虫策略,需要持续投入技术资源进行适配和优化。

法律合规风险:不当的爬虫行为可能面临法律风险,需要专业的合规指导。

基础设施投入大:稳定的代理服务、验证码识别、分布式架构都需要大量资金投入。

数据质量保证难:确保数据的准确性、完整性和时效性需要专业的质量控制体系。

Pangolin Scrape API:专业的电商数据解决方案

如果您专注于沃尔玛运营和选品,希望将专业的数据采集工作交给专业团队,Pangolin Scrape API是您的理想选择。

核心优势

免维护智能解析:Pangolin Scrape API采用智能识别算法,自动适配沃尔玛等电商平台的页面结构变化,开发者无需关注DOM结构更新。

丰富的数据字段:支持抓取商品ID、图片、标题、评分、评论数、尺寸、颜色、描述、价格、库存状态等全面的商品信息。

多种调用方式:提供同步和异步两种API调用方式,满足不同业务场景需求。

快速集成示例

使用Pangolin Scrape API抓取沃尔玛商品信息非常简单:

import requests
import json

# 认证获取token
auth_url = "http://scrapeapi.pangolinfo.com/api/v1/auth"
auth_data = {
    "email": "[email protected]",
    "password": "your_password"
}

response = requests.post(auth_url, json=auth_data)
token = response.json()['data']

# 抓取沃尔玛商品详情
scrape_url = "http://scrapeapi.pangolinfo.com/api/v1"
headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {token}"
}

scrape_data = {
    "url": "https://www.walmart.com/ip/your-product-url",
    "parserName": "walmProductDetail",
    "formats": ["json"]
}

result = requests.post(scrape_url, headers=headers, json=scrape_data)
product_data = result.json()

服务特色

  • 7×24小时稳定服务:专业运维团队保障服务稳定性
  • 智能反爬虫应对:内置IP轮换、请求头随机化等反检测机制
  • 数据质量保证:多重验证确保数据准确性和完整性
  • 灵活的输出格式:支持JSON、Markdown、原始HTML多种格式
  • 按需付费:根据实际使用量付费,降低成本

通过Pangolin Scrape API,您可以将更多精力投入到核心业务逻辑中,而无需担心复杂的技术实现和维护工作。

总结

本文全面介绍了如何使用Python构建一个专业级的沃尔玛爬虫系统,涵盖了从基础环境搭建到高级优化技巧的完整流程。我们详细讲解了应对反爬虫策略、数据处理、分布式架构等关键技术点,并提供了丰富的代码示例。

构建自己的爬虫系统虽然能够深度定制,但也面临着技术维护、合规风险、成本投入等诸多挑战。对于专注业务发展的企业而言,选择像Pangolin Scrape API这样的专业服务,能够更高效地获取所需数据,同时避免技术陷阱。

无论选择自建还是使用专业服务,关键是要根据自己的业务需求、技术能力和资源投入来做出明智的决策。数据驱动的电商时代,掌握准确、及时的市场信息就是掌握了竞争的主动权。

正如古人云:”工欲善其事,必先利其器”——选择合适的数据采集方案,让您在电商征途中事半功倍,决胜千里。

Our solution

Protect your web crawler against blocked requests, proxy failure, IP leak, browser crash and CAPTCHAs!

With Data Pilot, easily access cross-page, endto-end data, solving data fragmentation andcomplexity, empowering quick, informedbusiness decisions.

Weekly Tutorial

Sign up for our Newsletter

Sign up now to embark on your Amazon data journey, and we will provide you with the most accurate and efficient data collection solutions.

滚动至顶部

Unlock website data now!

Submit request → Get a custom solution + Free API test.

We use TLS/SSL encryption, and your submitted information is only used for solution communication.

This website uses cookies to ensure you get the best experience.

联系我们,您的问题,我们随时倾听

无论您在使用 Pangolin 产品的过程中遇到任何问题,或有任何需求与建议,我们都在这里为您提供支持。请填写以下信息,我们的团队将尽快与您联系,确保您获得最佳的产品体验。

Talk to our team

If you encounter any issues while using Pangolin products, please fill out the following information, and our team will contact you as soon as possible to ensure you have the best product experience.