本文提供了Python调用Pangolin API的完整教程,涵盖环境配置、API认证、基础调用、批量处理、错误处理等核心技能,并通过Best Seller榜单监控和竞品价格追踪两个实战项目展示了API的实际应用。文章包含7个可直接运行的代码示例,总计超过500行生产级代码,帮助开发者在30分钟内快速掌握Pangolin
Python开发环境展示Pangolin API调用代码和亚马逊数据采集实时结果,Python调用Pangolin API

当你第一次尝试从亚马逊采集商品数据时,可能会遇到这样的困境:花费数小时编写爬虫代码,调试CSS选择器和XPath表达式,好不容易跑通了脚本,第二天却发现亚马逊更新了页面结构,所有代码都需要重写。更令人头疼的是,即使代码能正常运行,也经常因为反爬虫机制被封IP,或者因为请求频率过高导致账号被限制。这种”今天能用,明天就坏”的状态,让很多开发者在数据采集项目上投入了大量时间却收效甚微,项目进度一拖再拖,技术债务越积越多。

问题的根源在于,传统的网页爬虫开发需要开发者同时处理太多底层细节:HTTP请求的构建、Cookie和Session管理、反爬虫对抗、页面解析、数据清洗、异常处理、并发控制等等。一个看似简单的”获取商品价格”需求,实际代码可能超过500行,而且这些代码的大部分都在处理与业务逻辑无关的技术问题。更关键的是,这种方式的维护成本极高——每当目标网站更新页面结构,你就需要重新分析DOM树、调整选择器、测试验证,整个流程可能需要几天甚至一周时间。

Pangolin API的出现彻底改变了这个局面。通过提供稳定的RESTful接口和结构化的JSON数据返回,它将复杂的数据采集工作简化为几行Python代码的API调用。你不需要关心亚马逊的页面结构如何变化,不需要处理反爬虫机制,不需要维护代理IP池,只需要专注于你的业务逻辑——获取数据、分析数据、应用数据。本文将通过7个完整的代码示例,从环境配置到实战项目,手把手教你掌握Python调用Pangolin API的全部技能,让你在30分钟内就能构建出一个可用的数据采集系统。

第一步:Python环境配置与依赖安装

在开始编写代码之前,我们需要搭建一个合适的Python开发环境。Pangolin API对Python版本的要求非常宽松,支持Python 3.7及以上的所有版本,但我强烈建议使用Python 3.9或3.10,因为这两个版本在性能和稳定性上都有显著提升,而且与主流的数据分析库兼容性最好。如果你的系统中还没有安装Python,可以从官网下载对应操作系统的安装包,Windows用户建议勾选”Add Python to PATH”选项,这样可以在命令行中直接使用python命令。

环境准备好后,我们需要安装必要的Python库。Pangolin API本身不需要专门的SDK,只需要使用Python标准库中的requests模块即可完成所有操作,这大大降低了项目的依赖复杂度。打开命令行工具,执行以下命令安装依赖包:

# 安装核心依赖
pip install requests

# 安装数据处理库(可选,用于高级数据分析)
pip install pandas numpy

# 安装JSON处理增强库(可选,用于美化输出)
pip install json5

安装完成后,我们可以创建一个项目目录来组织代码。建议的项目结构如下:

pangolin-api-project/
├── config.py          # 配置文件(存储API密钥等)
├── api_client.py      # API客户端封装
├── examples/          # 示例代码目录
│   ├── basic_call.py      # 基础调用示例
│   ├── batch_fetch.py     # 批量采集示例
│   └── monitor.py         # 监控项目示例
├── utils/             # 工具函数
│   ├── error_handler.py   # 错误处理
│   └── data_parser.py     # 数据解析
└── data/              # 数据存储目录
    ├── raw/               # 原始数据
    └── processed/         # 处理后的数据

这种结构化的组织方式不仅让代码更易于维护,也为后续的功能扩展预留了空间。接下来,我们在config.py中配置API认证信息。需要注意的是,永远不要将API密钥硬编码在代码中,也不要提交到Git仓库,正确的做法是使用环境变量或配置文件,并将配置文件加入.gitignore。

第二步:API认证与Token获取

Pangolin API使用Bearer Token认证方式,这是一种简单而安全的认证机制。整个认证流程分为两步:首先使用邮箱和密码获取Token,然后在后续的所有API请求中携带这个Token。Token是长期有效的,这意味着你只需要在程序启动时获取一次,就可以在整个会话期间重复使用,无需每次请求都重新认证。

让我们编写第一个完整的Python代码示例,实现API认证功能:

import requests
import json
from typing import Optional

class PangolinAPIClient:
    """Pangolin API客户端封装类"""
    
    def __init__(self, email: str, password: str):
        """
        初始化API客户端
        
        Args:
            email: 注册邮箱
            password: 账户密码
        """
        self.base_url = "https://scrapeapi.pangolinfo.com"
        self.email = email
        self.password = password
        self.token: Optional[str] = None
        
    def authenticate(self) -> bool:
        """
        执行API认证,获取访问Token
        
        Returns:
            bool: 认证是否成功
        """
        auth_url = f"{self.base_url}/api/v1/auth"
        
        payload = {
            "email": self.email,
            "password": self.password
        }
        
        headers = {
            "Content-Type": "application/json"
        }
        
        try:
            response = requests.post(
                auth_url,
                json=payload,
                headers=headers,
                timeout=10
            )
            
            # 检查HTTP状态码
            response.raise_for_status()
            
            # 解析响应
            result = response.json()
            
            if result.get("code") == 0:
                self.token = result.get("data")
                print(f"✓ 认证成功!Token: {self.token[:20]}...")
                return True
            else:
                print(f"✗ 认证失败:{result.get('message')}")
                return False
                
        except requests.exceptions.Timeout:
            print("✗ 请求超时,请检查网络连接")
            return False
        except requests.exceptions.RequestException as e:
            print(f"✗ 请求异常:{str(e)}")
            return False
        except json.JSONDecodeError:
            print("✗ 响应解析失败,返回数据格式错误")
            return False
    
    def get_headers(self) -> dict:
        """
        获取带认证信息的请求头
        
        Returns:
            dict: 包含Authorization的请求头
        """
        if not self.token:
            raise ValueError("Token未初始化,请先调用authenticate()方法")
        
        return {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.token}"
        }

# 使用示例
if __name__ == "__main__":
    # 从环境变量或配置文件读取凭证
    client = PangolinAPIClient(
        email="[email protected]",
        password="your_password"
    )
    
    # 执行认证
    if client.authenticate():
        print("API客户端初始化完成,可以开始调用数据接口")
    else:
        print("认证失败,请检查邮箱和密码是否正确")

这段代码展示了几个重要的最佳实践:使用类封装提高代码复用性、完善的异常处理机制、清晰的日志输出、类型注解增强代码可读性。特别值得注意的是timeout参数的设置,这可以防止网络问题导致程序长时间挂起。在生产环境中,建议将Token缓存到文件或数据库中,避免频繁调用认证接口。

第三步:基础API调用 – 获取商品详情

完成认证后,我们就可以开始调用Pangolin的核心功能了。最常用的场景是获取亚马逊商品的详细信息,包括标题、价格、评分、评论数、库存状态等数十个字段。Pangolin API支持三种数据返回格式:json(结构化数据)、rawHtml(原始HTML)和markdown(Markdown格式),对于大多数应用场景,json格式是最佳选择,因为它已经完成了数据清洗和结构化处理,可以直接用于业务逻辑。

让我们编写一个完整的商品详情获取函数:

def fetch_product_detail(self, asin: str, zipcode: str = "10041") -> dict:
    """
    获取亚马逊商品详情
    
    Args:
        asin: 商品ASIN码
        zipcode: 邮编(用于获取特定地区的价格和库存)
        
    Returns:
        dict: 商品详情数据
    """
    scrape_url = f"{self.base_url}/api/v1/scrape"
    
    payload = {
        "url": f"https://www.amazon.com/dp/{asin}",
        "parserName": "amzProductDetail",
        "format": "json",
        "bizContext": {
            "zipcode": zipcode
        }
    }
    
    try:
        response = requests.post(
            scrape_url,
            json=payload,
            headers=self.get_headers(),
            timeout=30  # 商品详情页数据较多,适当延长超时时间
        )
        
        response.raise_for_status()
        result = response.json()
        
        if result.get("code") == 0:
            # 提取核心数据
            data = result.get("data", {})
            json_data = data.get("json", [{}])[0]
            
            if json_data.get("code") == 0:
                product_info = json_data.get("data", {}).get("results", [{}])[0]
                
                # 提取关键字段
                simplified_data = {
                    "asin": product_info.get("asin"),
                    "title": product_info.get("title"),
                    "price": product_info.get("price"),
                    "rating": product_info.get("star"),
                    "review_count": product_info.get("rating"),
                    "brand": product_info.get("brand"),
                    "image": product_info.get("image"),
                    "in_stock": product_info.get("has_cart"),
                    "category": product_info.get("category_name")
                }
                
                print(f"✓ 成功获取商品 {asin} 的详情")
                return simplified_data
            else:
                print(f"✗ 数据解析失败:{json_data.get('message')}")
                return {}
        else:
            error_msg = result.get("message")
            if result.get("code") == 2001:
                print("✗ 积点余额不足,请充值后继续使用")
            elif result.get("code") == 1004:
                print("✗ Token无效,请重新认证")
            else:
                print(f"✗ API调用失败:{error_msg}")
            return {}
            
    except requests.exceptions.Timeout:
        print(f"✗ 请求超时:商品 {asin} 数据获取失败")
        return {}
    except Exception as e:
        print(f"✗ 未知错误:{str(e)}")
        return {}

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        # 获取单个商品详情
        product = client.fetch_product_detail("B0DYTF8L2W")
        
        if product:
            print("\n商品信息:")
            print(f"标题:{product['title']}")
            print(f"价格:{product['price']}")
            print(f"评分:{product['rating']} ({product['review_count']}条评论)")
            print(f"品牌:{product['brand']}")
            print(f"库存:{'有货' if product['in_stock'] else '缺货'}")

这个示例展示了几个关键技术点:首先是parserName参数的使用,它告诉API使用哪个解析模板来提取数据,Pangolin支持多种解析器包括商品详情、关键词搜索、榜单等;其次是bizContext的应用,通过指定邮编可以获取特定地区的价格和库存信息,这对于跨区域价格对比非常有用;最后是错误码的处理,不同的错误码代表不同的问题,需要针对性地处理。

第四步:批量数据采集与并发优化

在实际应用中,我们往往需要采集大量商品的数据,比如监控一个类目下的所有竞品,或者追踪自己店铺的全部SKU。如果使用串行方式逐个调用API,效率会非常低下——假设每个请求耗时10秒,采集1000个商品就需要近3小时。通过并发处理,我们可以将时间缩短到10-15分钟,效率提升10倍以上。

Python提供了多种并发方案,对于IO密集型的API调用场景,使用concurrent.futures模块的ThreadPoolExecutor是最佳选择。它提供了简洁的接口,自动管理线程池,并且支持异常处理和结果收集。让我们实现一个批量采集函数:

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
import time

def batch_fetch_products(self, asin_list: List[str], max_workers: int = 5) -> List[dict]:
    """
    批量获取商品详情(并发版本)
    
    Args:
        asin_list: ASIN列表
        max_workers: 最大并发线程数(建议5-10)
        
    Returns:
        List[dict]: 商品详情列表
    """
    results = []
    failed_asins = []
    
    print(f"开始批量采集 {len(asin_list)} 个商品...")
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_asin = {
            executor.submit(self.fetch_product_detail, asin): asin 
            for asin in asin_list
        }
        
        # 收集结果
        for future in as_completed(future_to_asin):
            asin = future_to_asin[future]
            try:
                product_data = future.result()
                if product_data:
                    results.append(product_data)
                else:
                    failed_asins.append(asin)
            except Exception as e:
                print(f"✗ 处理 {asin} 时发生异常:{str(e)}")
                failed_asins.append(asin)
    
    elapsed_time = time.time() - start_time
    
    print(f"\n批量采集完成!")
    print(f"成功:{len(results)} 个")
    print(f"失败:{len(failed_asins)} 个")
    print(f"耗时:{elapsed_time:.2f} 秒")
    print(f"平均速度:{len(asin_list)/elapsed_time:.2f} 个/秒")
    
    if failed_asins:
        print(f"失败的ASIN:{', '.join(failed_asins)}")
    
    return results

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        # 准备要采集的ASIN列表
        asin_list = [
            "B0DYTF8L2W",
            "B08N5WRWNW",
            "B07ZPKN6YR",
            "B08L5VFJ2R",
            "B09JQMJHXY"
        ]
        
        # 批量采集
        products = client.batch_fetch_products(asin_list, max_workers=5)
        
        # 保存到CSV文件
        if products:
            import pandas as pd
            df = pd.DataFrame(products)
            df.to_csv("products.csv", index=False, encoding="utf-8-sig")
            print(f"\n数据已保存到 products.csv")

这段代码的关键在于max_workers参数的设置。虽然理论上线程数越多速度越快,但实际上受限于API服务器的处理能力和你的网络带宽,过多的并发请求可能导致超时或被限流。根据实测,5-10个并发线程是最佳平衡点,既能充分利用并发优势,又不会给服务器造成过大压力。如果你有大量数据需要采集(比如上万个ASIN),建议分批处理,每批1000个左右,批次之间间隔几秒钟。

第五步:完善的错误处理与重试机制

在生产环境中,网络波动、服务器临时故障、数据格式异常等问题不可避免,一个健壮的系统必须具备完善的错误处理和自动重试能力。Pangolin API返回的错误码非常详细,包括积点不足(2001)、Token无效(1004)、账户过期(2007)、爬取失败(10000/10001)等,我们需要针对不同的错误类型采取不同的处理策略。

让我们实现一个带有指数退避重试机制的API调用装饰器:

import functools
import time
from typing import Callable, Any

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
):
    """
    带指数退避的重试装饰器
    
    Args:
        max_retries: 最大重试次数
        base_delay: 基础延迟时间(秒)
        max_delay: 最大延迟时间(秒)
        exponential_base: 指数基数
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            retries = 0
            
            while retries <= max_retries:
                try:
                    return func(*args, **kwargs)
                except requests.exceptions.Timeout:
                    retries += 1
                    if retries > max_retries:
                        print(f"✗ 超过最大重试次数 ({max_retries}),放弃请求")
                        raise
                    
                    # 计算延迟时间(指数退避)
                    delay = min(
                        base_delay * (exponential_base ** (retries - 1)),
                        max_delay
                    )
                    
                    print(f"⟳ 请求超时,{delay:.1f}秒后进行第{retries}次重试...")
                    time.sleep(delay)
                    
                except requests.exceptions.RequestException as e:
                    # 对于非超时的网络错误,不重试
                    print(f"✗ 网络错误:{str(e)}")
                    raise
                    
        return wrapper
    return decorator

class PangolinAPIClientWithRetry(PangolinAPIClient):
    """带重试机制的API客户端"""
    
    @retry_with_backoff(max_retries=3, base_delay=2.0)
    def fetch_product_detail_with_retry(self, asin: str, zipcode: str = "10041") -> dict:
        """
        获取商品详情(带重试)
        
        这个方法会在遇到超时错误时自动重试,
        重试间隔采用指数退避策略:2秒、4秒、8秒
        """
        return self.fetch_product_detail(asin, zipcode)
    
    def handle_api_error(self, error_code: int, error_message: str) -> str:
        """
        统一的错误处理函数
        
        Args:
            error_code: API返回的错误码
            error_message: 错误信息
            
        Returns:
            str: 用户友好的错误提示
        """
        error_handlers = {
            0: "操作成功",
            1004: "Token无效,请重新登录认证",
            2001: "积点余额不足,请前往控制台充值",
            2007: "账户已过期,请联系客服续费",
            10000: "数据爬取失败,请稍后重试",
            10001: "数据爬取失败,目标页面可能不存在",
            404: "URL地址错误,请检查链接格式"
        }
        
        user_message = error_handlers.get(
            error_code,
            f"未知错误 (代码: {error_code})"
        )
        
        # 记录详细错误日志
        self.log_error(error_code, error_message)
        
        return user_message
    
    def log_error(self, error_code: int, error_message: str):
        """记录错误日志到文件"""
        import datetime
        
        log_entry = {
            "timestamp": datetime.datetime.now().isoformat(),
            "error_code": error_code,
            "error_message": error_message
        }
        
        # 这里可以扩展为写入日志文件或发送告警
        print(f"[ERROR LOG] {log_entry}")

指数退避策略的核心思想是:第一次重试等待时间较短,如果仍然失败,则逐次增加等待时间,这样既能快速恢复临时性故障,又能避免在服务器压力大时雪上加霜。在实际使用中,你可以根据业务特点调整重试参数,比如对于实时性要求高的场景可以减少重试次数,对于批量后台任务可以增加重试次数和延迟时间。

实战项目一:Best Seller榜单实时监控系统

让我们将前面学到的知识整合起来,构建一个真实的应用场景:监控亚马逊Best Seller榜单的变化。这个系统可以帮助卖家及时发现热门产品趋势,分析竞品动态,为选品和运营决策提供数据支持。我们将实现以下功能:定时采集榜单数据、对比排名变化、识别新上榜产品、生成变化报告。

import json
import time
from datetime import datetime
from typing import List, Dict

class BestSellerMonitor:
    """Best Seller榜单监控器"""
    
    def __init__(self, api_client: PangolinAPIClient):
        self.client = api_client
        self.history_file = "data/bestseller_history.json"
        self.load_history()
    
    def load_history(self):
        """加载历史数据"""
        try:
            with open(self.history_file, 'r', encoding='utf-8') as f:
                self.history = json.load(f)
        except FileNotFoundError:
            self.history = {}
    
    def save_history(self):
        """保存历史数据"""
        with open(self.history_file, 'w', encoding='utf-8') as f:
            json.dump(self.history, f, ensure_ascii=False, indent=2)
    
    def fetch_bestseller_list(self, category_url: str) -> List[Dict]:
        """
        获取Best Seller榜单
        
        Args:
            category_url: 类目URL
            
        Returns:
            List[Dict]: 榜单商品列表
        """
        scrape_url = f"{self.client.base_url}/api/v1/scrape"
        
        payload = {
            "url": category_url,
            "parserName": "amzBestSellers",
            "format": "json",
            "bizContext": {
                "zipcode": "10041"
            }
        }
        
        try:
            response = requests.post(
                scrape_url,
                json=payload,
                headers=self.client.get_headers(),
                timeout=30
            )
            
            response.raise_for_status()
            result = response.json()
            
            if result.get("code") == 0:
                data = result.get("data", {})
                json_data = data.get("json", [{}])[0]
                
                if json_data.get("code") == 0:
                    products = json_data.get("data", {}).get("results", [])
                    print(f"✓ 成功获取 {len(products)} 个榜单商品")
                    return products
            
            return []
            
        except Exception as e:
            print(f"✗ 获取榜单失败:{str(e)}")
            return []
    
    def compare_rankings(self, current_data: List[Dict], previous_data: List[Dict]) -> Dict:
        """
        对比排名变化
        
        Args:
            current_data: 当前榜单数据
            previous_data: 历史榜单数据
            
        Returns:
            Dict: 变化分析结果
        """
        changes = {
            "new_entries": [],      # 新上榜
            "rank_up": [],          # 排名上升
            "rank_down": [],        # 排名下降
            "dropped_out": []       # 跌出榜单
        }
        
        # 构建ASIN到排名的映射
        current_ranks = {item['asin']: idx + 1 for idx, item in enumerate(current_data)}
        previous_ranks = {item['asin']: idx + 1 for idx, item in enumerate(previous_data)}
        
        # 识别新上榜商品
        for asin in current_ranks:
            if asin not in previous_ranks:
                product = next(p for p in current_data if p['asin'] == asin)
                changes["new_entries"].append({
                    "asin": asin,
                    "title": product.get("title"),
                    "rank": current_ranks[asin]
                })
        
        # 识别排名变化
        for asin in current_ranks:
            if asin in previous_ranks:
                rank_change = previous_ranks[asin] - current_ranks[asin]
                
                if rank_change > 0:  # 排名上升
                    product = next(p for p in current_data if p['asin'] == asin)
                    changes["rank_up"].append({
                        "asin": asin,
                        "title": product.get("title"),
                        "from_rank": previous_ranks[asin],
                        "to_rank": current_ranks[asin],
                        "change": rank_change
                    })
                elif rank_change < 0:  # 排名下降
                    product = next(p for p in current_data if p['asin'] == asin)
                    changes["rank_down"].append({
                        "asin": asin,
                        "title": product.get("title"),
                        "from_rank": previous_ranks[asin],
                        "to_rank": current_ranks[asin],
                        "change": abs(rank_change)
                    })
        
        # 识别跌出榜单的商品
        for asin in previous_ranks:
            if asin not in current_ranks:
                product = next(p for p in previous_data if p['asin'] == asin)
                changes["dropped_out"].append({
                    "asin": asin,
                    "title": product.get("title"),
                    "previous_rank": previous_ranks[asin]
                })
        
        return changes
    
    def generate_report(self, changes: Dict) -> str:
        """生成变化报告"""
        report = []
        report.append("=" * 60)
        report.append(f"Best Seller榜单变化报告")
        report.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("=" * 60)
        
        if changes["new_entries"]:
            report.append(f"\n🆕 新上榜商品 ({len(changes['new_entries'])}个):")
            for item in changes["new_entries"][:5]:  # 只显示前5个
                report.append(f"  #{item['rank']} - {item['title'][:50]}...")
        
        if changes["rank_up"]:
            report.append(f"\n📈 排名上升 ({len(changes['rank_up'])}个):")
            for item in sorted(changes["rank_up"], key=lambda x: x['change'], reverse=True)[:5]:
                report.append(f"  {item['title'][:50]}...")
                report.append(f"     {item['from_rank']} → {item['to_rank']} (↑{item['change']})")
        
        if changes["rank_down"]:
            report.append(f"\n📉 排名下降 ({len(changes['rank_down'])}个):")
            for item in sorted(changes["rank_down"], key=lambda x: x['change'], reverse=True)[:5]:
                report.append(f"  {item['title'][:50]}...")
                report.append(f"     {item['from_rank']} → {item['to_rank']} (↓{item['change']})")
        
        if changes["dropped_out"]:
            report.append(f"\n❌ 跌出榜单 ({len(changes['dropped_out'])}个):")
            for item in changes["dropped_out"][:5]:
                report.append(f"  {item['title'][:50]}... (原排名#{item['previous_rank']})")
        
        return "\n".join(report)
    
    def run_monitor(self, category_url: str, interval_minutes: int = 60):
        """
        运行监控任务
        
        Args:
            category_url: 要监控的类目URL
            interval_minutes: 检查间隔(分钟)
        """
        print(f"开始监控Best Seller榜单...")
        print(f"检查间隔:{interval_minutes}分钟")
        
        while True:
            try:
                # 获取当前榜单
                current_data = self.fetch_bestseller_list(category_url)
                
                if current_data:
                    # 获取历史数据
                    category_key = category_url.split('/')[-1]
                    previous_data = self.history.get(category_key, [])
                    
                    if previous_data:
                        # 对比变化
                        changes = self.compare_rankings(current_data, previous_data)
                        
                        # 生成报告
                        report = self.generate_report(changes)
                        print(report)
                        
                        # 保存报告到文件
                        with open(f"data/report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", 'w', encoding='utf-8') as f:
                            f.write(report)
                    
                    # 更新历史数据
                    self.history[category_key] = current_data
                    self.save_history()
                
                # 等待下次检查
                print(f"\n下次检查时间:{(datetime.now() + timedelta(minutes=interval_minutes)).strftime('%H:%M:%S')}")
                time.sleep(interval_minutes * 60)
                
            except KeyboardInterrupt:
                print("\n监控已停止")
                break
            except Exception as e:
                print(f"✗ 监控过程发生错误:{str(e)}")
                time.sleep(60)  # 出错后等待1分钟再继续

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        monitor = BestSellerMonitor(client)
        
        # 监控电子产品类目的Best Seller榜单
        category_url = "https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics"
        monitor.run_monitor(category_url, interval_minutes=60)

这个监控系统展示了如何将API调用、数据处理、变化检测和报告生成整合成一个完整的应用。在实际使用中,你可以进一步扩展功能,比如添加邮件或微信通知、将数据存储到数据库、生成可视化图表等。关键是要保持代码的模块化和可扩展性,这样后续添加新功能时不需要大规模重构。

实战项目二:竞品价格追踪与预警系统

价格是电商竞争的核心要素之一,及时掌握竞品的价格变化对于制定定价策略至关重要。我们将构建一个自动化的价格追踪系统,定期采集竞品价格,检测异常波动,并在价格变化超过阈值时发送预警。这个系统可以帮助卖家快速响应市场变化,避免因价格劣势导致的销量下滑。

import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional

class PriceTracker:
    """竞品价格追踪器"""
    
    def __init__(self, api_client: PangolinAPIClient):
        self.client = api_client
        self.price_history_file = "data/price_history.csv"
        self.load_price_history()
    
    def load_price_history(self):
        """加载价格历史数据"""
        try:
            self.price_df = pd.read_csv(self.price_history_file)
            self.price_df['timestamp'] = pd.to_datetime(self.price_df['timestamp'])
        except FileNotFoundError:
            self.price_df = pd.DataFrame(columns=[
                'timestamp', 'asin', 'title', 'price', 'currency'
            ])
    
    def save_price_history(self):
        """保存价格历史"""
        self.price_df.to_csv(self.price_history_file, index=False)
    
    def track_price(self, asin: str) -> Optional[Dict]:
        """
        追踪单个商品价格
        
        Args:
            asin: 商品ASIN
            
        Returns:
            Optional[Dict]: 价格信息
        """
        product = self.client.fetch_product_detail(asin)
        
        if product and product.get('price'):
            price_str = product['price']
            
            # 解析价格(处理$符号和逗号)
            try:
                price_value = float(price_str.replace('$', '').replace(',', ''))
                
                price_record = {
                    'timestamp': datetime.now(),
                    'asin': asin,
                    'title': product.get('title', ''),
                    'price': price_value,
                    'currency': 'USD'
                }
                
                # 添加到历史记录
                self.price_df = pd.concat([
                    self.price_df,
                    pd.DataFrame([price_record])
                ], ignore_index=True)
                
                return price_record
                
            except ValueError:
                print(f"✗ 价格解析失败:{price_str}")
                return None
        
        return None
    
    def batch_track_prices(self, asin_list: List[str]) -> List[Dict]:
        """批量追踪价格"""
        results = []
        
        for asin in asin_list:
            price_record = self.track_price(asin)
            if price_record:
                results.append(price_record)
            time.sleep(1)  # 避免请求过快
        
        self.save_price_history()
        return results
    
    def detect_price_changes(self, asin: str, threshold_percent: float = 5.0) -> Dict:
        """
        检测价格变化
        
        Args:
            asin: 商品ASIN
            threshold_percent: 变化阈值(百分比)
            
        Returns:
            Dict: 变化分析结果
        """
        # 获取该商品的历史价格
        product_prices = self.price_df[self.price_df['asin'] == asin].sort_values('timestamp')
        
        if len(product_prices) < 2:
            return {"status": "insufficient_data"}
        
        latest_price = product_prices.iloc[-1]['price']
        previous_price = product_prices.iloc[-2]['price']
        
        price_change = latest_price - previous_price
        price_change_percent = (price_change / previous_price) * 100
        
        result = {
            "asin": asin,
            "title": product_prices.iloc[-1]['title'],
            "current_price": latest_price,
            "previous_price": previous_price,
            "change_amount": price_change,
            "change_percent": price_change_percent,
            "status": "normal"
        }
        
        if abs(price_change_percent) >= threshold_percent:
            result["status"] = "alert"
            result["alert_type"] = "price_increase" if price_change > 0 else "price_decrease"
        
        return result
    
    def generate_price_report(self, asin_list: List[str]) -> str:
        """生成价格报告"""
        report = []
        report.append("=" * 60)
        report.append("竞品价格追踪报告")
        report.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("=" * 60)
        
        for asin in asin_list:
            change_info = self.detect_price_changes(asin, threshold_percent=5.0)
            
            if change_info.get("status") == "insufficient_data":
                continue
            
            report.append(f"\n商品:{change_info['title'][:50]}...")
            report.append(f"ASIN:{asin}")
            report.append(f"当前价格:${change_info['current_price']:.2f}")
            report.append(f"上次价格:${change_info['previous_price']:.2f}")
            
            if change_info['status'] == "alert":
                symbol = "📈" if change_info['alert_type'] == "price_increase" else "📉"
                report.append(f"{symbol} 价格变化:{change_info['change_percent']:+.2f}% (${change_info['change_amount']:+.2f})")
                report.append("⚠️  已触发价格预警!")
            else:
                report.append(f"价格变化:{change_info['change_percent']:+.2f}% (${change_info['change_amount']:+.2f})")
        
        return "\n".join(report)
    
    def get_price_trend(self, asin: str, days: int = 30) -> pd.DataFrame:
        """
        获取价格趋势数据
        
        Args:
            asin: 商品ASIN
            days: 天数
            
        Returns:
            pd.DataFrame: 价格趋势数据
        """
        cutoff_date = datetime.now() - timedelta(days=days)
        
        trend_data = self.price_df[
            (self.price_df['asin'] == asin) &
            (self.price_df['timestamp'] >= cutoff_date)
        ].sort_values('timestamp')
        
        return trend_data

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        tracker = PriceTracker(client)
        
        # 定义要追踪的竞品ASIN列表
        competitor_asins = [
            "B08N5WRWNW",
            "B07ZPKN6YR",
            "B08L5VFJ2R"
        ]
        
        # 批量追踪价格
        print("开始追踪竞品价格...")
        tracker.batch_track_prices(competitor_asins)
        
        # 生成报告
        report = tracker.generate_price_report(competitor_asins)
        print(report)
        
        # 保存报告
        with open(f"data/price_report_{datetime.now().strftime('%Y%m%d')}.txt", 'w', encoding='utf-8') as f:
            f.write(report)

这个价格追踪系统的核心价值在于自动化和及时性。通过定时任务(可以使用cron或Windows任务计划程序)每天运行一次,你就能建立起完整的价格历史数据库,为定价决策提供数据支撑。进一步地,你可以结合机器学习算法预测价格趋势,或者根据竞品价格自动调整自己的定价策略,实现真正的智能化运营。

总结与最佳实践建议

通过本文的7个完整代码示例,我们系统地学习了Python调用Pangolin API的全部技能,从基础的环境配置、API认证,到高级的并发处理、错误重试,再到实战的监控系统和追踪系统。这些知识不仅适用于Pangolin API,也是开发任何API集成项目的通用方法论。

在实际应用中,有几个关键的最佳实践需要特别注意:首先是安全性,永远不要将API密钥硬编码在代码中,使用环境变量或加密的配置文件;其次是稳定性,完善的错误处理和重试机制是生产环境的必备要素;第三是性能,合理使用并发和缓存可以大幅提升系统效率;最后是可维护性,模块化的代码结构和清晰的文档注释能让项目长期健康发展。

如果你现在就想开始实践,建议按照以下步骤进行:第一,访问Pangolinfo官网注册账号并获取API密钥;第二,使用本文提供的代码示例搭建基础框架;第三,根据你的实际需求定制功能模块;第四,在小规模测试环境中验证系统稳定性;第五,逐步扩展到生产环境。记住,任何复杂的系统都是从简单的MVP开始迭代而来的,不要试图一次性实现所有功能,而是要持续优化和改进。

更多技术细节和API参数说明,请参考Pangolinfo API官方文档。如果在开发过程中遇到问题,可以访问开发者控制台查看API调用日志和积点使用情况。祝你开发顺利,用数据驱动业务增长!

立即开始Python API开发之旅 → 访问 Pangolinfo Scrape API 获取免费试用额度,或查看 完整API文档 了解更多技术细节。让Python代码为你的数据采集赋能!

解决方案

为电商场景打造的高可用数据采集 API,自动规避 IP 封禁、验证码拦截、代理故障等爬虫难题,无需复杂配置即可快速获取精准、稳定的电商数据。

AMZ Data Tracker 是亚马逊卖家专属的全方位运营工具,集关键词调研、竞品销量追踪、Listing 优化、恶意跟卖与差评监控于一体,助力卖家数据化决策,高效提升店铺销量与排名。

每周教程

准备好开始您的数据采集之旅了吗?

注册免费账户,立即体验强大的网页数据采集API,无需信用卡。

微信扫一扫
与我们联系

QR Code
快速测试