本文提供了Python调用Pangolin API的完整教程,涵盖环境配置、API认证、基础调用、批量处理、错误处理等核心技能,并通过Best Seller榜单监控和竞品价格追踪两个实战项目展示了API的实际应用。文章包含7个可直接运行的代码示例,总计超过500行生产级代码,帮助开发者在30分钟内快速掌握Pangolin
Python开发环境展示Pangolin API调用代码和亚马逊数据采集实时结果,Python调用Pangolin API

当你第一次尝试从亚马逊采集商品数据时,可能会遇到这样的困境:花费数小时编写爬虫代码,调试CSS选择器和XPath表达式,好不容易跑通了脚本,第二天却发现亚马逊更新了页面结构,所有代码都需要重写。更令人头疼的是,即使代码能正常运行,也经常因为反爬虫机制被封IP,或者因为请求频率过高导致账号被限制。这种”今天能用,明天就坏”的状态,让很多开发者在数据采集项目上投入了大量时间却收效甚微,项目进度一拖再拖,技术债务越积越多。

问题的根源在于,传统的网页爬虫开发需要开发者同时处理太多底层细节:HTTP请求的构建、Cookie和Session管理、反爬虫对抗、页面解析、数据清洗、异常处理、并发控制等等。一个看似简单的”获取商品价格”需求,实际代码可能超过500行,而且这些代码的大部分都在处理与业务逻辑无关的技术问题。更关键的是,这种方式的维护成本极高——每当目标网站更新页面结构,你就需要重新分析DOM树、调整选择器、测试验证,整个流程可能需要几天甚至一周时间。

Pangolin API的出现彻底改变了这个局面。通过提供稳定的RESTful接口和结构化的JSON数据返回,它将复杂的数据采集工作简化为几行Python代码的API调用。你不需要关心亚马逊的页面结构如何变化,不需要处理反爬虫机制,不需要维护代理IP池,只需要专注于你的业务逻辑——获取数据、分析数据、应用数据。本文将通过7个完整的代码示例,从环境配置到实战项目,手把手教你掌握Python调用Pangolin API的全部技能,让你在30分钟内就能构建出一个可用的数据采集系统。

第一步:Python环境配置与依赖安装

在开始编写代码之前,我们需要搭建一个合适的Python开发环境。Pangolin API对Python版本的要求非常宽松,支持Python 3.7及以上的所有版本,但我强烈建议使用Python 3.9或3.10,因为这两个版本在性能和稳定性上都有显著提升,而且与主流的数据分析库兼容性最好。如果你的系统中还没有安装Python,可以从官网下载对应操作系统的安装包,Windows用户建议勾选”Add Python to PATH”选项,这样可以在命令行中直接使用python命令。

环境准备好后,我们需要安装必要的Python库。Pangolin API本身不需要专门的SDK,只需要使用Python标准库中的requests模块即可完成所有操作,这大大降低了项目的依赖复杂度。打开命令行工具,执行以下命令安装依赖包:

# 安装核心依赖
pip install requests

# 安装数据处理库(可选,用于高级数据分析)
pip install pandas numpy

# 安装JSON处理增强库(可选,用于美化输出)
pip install json5

安装完成后,我们可以创建一个项目目录来组织代码。建议的项目结构如下:

pangolin-api-project/
├── config.py          # 配置文件(存储API密钥等)
├── api_client.py      # API客户端封装
├── examples/          # 示例代码目录
│   ├── basic_call.py      # 基础调用示例
│   ├── batch_fetch.py     # 批量采集示例
│   └── monitor.py         # 监控项目示例
├── utils/             # 工具函数
│   ├── error_handler.py   # 错误处理
│   └── data_parser.py     # 数据解析
└── data/              # 数据存储目录
    ├── raw/               # 原始数据
    └── processed/         # 处理后的数据

这种结构化的组织方式不仅让代码更易于维护,也为后续的功能扩展预留了空间。接下来,我们在config.py中配置API认证信息。需要注意的是,永远不要将API密钥硬编码在代码中,也不要提交到Git仓库,正确的做法是使用环境变量或配置文件,并将配置文件加入.gitignore。

第二步:API认证与Token获取

Pangolin API使用Bearer Token认证方式,这是一种简单而安全的认证机制。整个认证流程分为两步:首先使用邮箱和密码获取Token,然后在后续的所有API请求中携带这个Token。Token是长期有效的,这意味着你只需要在程序启动时获取一次,就可以在整个会话期间重复使用,无需每次请求都重新认证。

让我们编写第一个完整的Python代码示例,实现API认证功能:

import requests
import json
from typing import Optional

class PangolinAPIClient:
    """Pangolin API客户端封装类"""
    
    def __init__(self, email: str, password: str):
        """
        初始化API客户端
        
        Args:
            email: 注册邮箱
            password: 账户密码
        """
        self.base_url = "https://scrapeapi.pangolinfo.com"
        self.email = email
        self.password = password
        self.token: Optional[str] = None
        
    def authenticate(self) -> bool:
        """
        执行API认证,获取访问Token
        
        Returns:
            bool: 认证是否成功
        """
        auth_url = f"{self.base_url}/api/v1/auth"
        
        payload = {
            "email": self.email,
            "password": self.password
        }
        
        headers = {
            "Content-Type": "application/json"
        }
        
        try:
            response = requests.post(
                auth_url,
                json=payload,
                headers=headers,
                timeout=10
            )
            
            # 检查HTTP状态码
            response.raise_for_status()
            
            # 解析响应
            result = response.json()
            
            if result.get("code") == 0:
                self.token = result.get("data")
                print(f"✓ 认证成功!Token: {self.token[:20]}...")
                return True
            else:
                print(f"✗ 认证失败:{result.get('message')}")
                return False
                
        except requests.exceptions.Timeout:
            print("✗ 请求超时,请检查网络连接")
            return False
        except requests.exceptions.RequestException as e:
            print(f"✗ 请求异常:{str(e)}")
            return False
        except json.JSONDecodeError:
            print("✗ 响应解析失败,返回数据格式错误")
            return False
    
    def get_headers(self) -> dict:
        """
        获取带认证信息的请求头
        
        Returns:
            dict: 包含Authorization的请求头
        """
        if not self.token:
            raise ValueError("Token未初始化,请先调用authenticate()方法")
        
        return {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.token}"
        }

# 使用示例
if __name__ == "__main__":
    # 从环境变量或配置文件读取凭证
    client = PangolinAPIClient(
        email="[email protected]",
        password="your_password"
    )
    
    # 执行认证
    if client.authenticate():
        print("API客户端初始化完成,可以开始调用数据接口")
    else:
        print("认证失败,请检查邮箱和密码是否正确")

这段代码展示了几个重要的最佳实践:使用类封装提高代码复用性、完善的异常处理机制、清晰的日志输出、类型注解增强代码可读性。特别值得注意的是timeout参数的设置,这可以防止网络问题导致程序长时间挂起。在生产环境中,建议将Token缓存到文件或数据库中,避免频繁调用认证接口。

第三步:基础API调用 – 获取商品详情

完成认证后,我们就可以开始调用Pangolin的核心功能了。最常用的场景是获取亚马逊商品的详细信息,包括标题、价格、评分、评论数、库存状态等数十个字段。Pangolin API支持三种数据返回格式:json(结构化数据)、rawHtml(原始HTML)和markdown(Markdown格式),对于大多数应用场景,json格式是最佳选择,因为它已经完成了数据清洗和结构化处理,可以直接用于业务逻辑。

让我们编写一个完整的商品详情获取函数:

def fetch_product_detail(self, asin: str, zipcode: str = "10041") -> dict:
    """
    获取亚马逊商品详情
    
    Args:
        asin: 商品ASIN码
        zipcode: 邮编(用于获取特定地区的价格和库存)
        
    Returns:
        dict: 商品详情数据
    """
    scrape_url = f"{self.base_url}/api/v1/scrape"
    
    payload = {
        "url": f"https://www.amazon.com/dp/{asin}",
        "parserName": "amzProductDetail",
        "format": "json",
        "bizContext": {
            "zipcode": zipcode
        }
    }
    
    try:
        response = requests.post(
            scrape_url,
            json=payload,
            headers=self.get_headers(),
            timeout=30  # 商品详情页数据较多,适当延长超时时间
        )
        
        response.raise_for_status()
        result = response.json()
        
        if result.get("code") == 0:
            # 提取核心数据
            data = result.get("data", {})
            json_data = data.get("json", [{}])[0]
            
            if json_data.get("code") == 0:
                product_info = json_data.get("data", {}).get("results", [{}])[0]
                
                # 提取关键字段
                simplified_data = {
                    "asin": product_info.get("asin"),
                    "title": product_info.get("title"),
                    "price": product_info.get("price"),
                    "rating": product_info.get("star"),
                    "review_count": product_info.get("rating"),
                    "brand": product_info.get("brand"),
                    "image": product_info.get("image"),
                    "in_stock": product_info.get("has_cart"),
                    "category": product_info.get("category_name")
                }
                
                print(f"✓ 成功获取商品 {asin} 的详情")
                return simplified_data
            else:
                print(f"✗ 数据解析失败:{json_data.get('message')}")
                return {}
        else:
            error_msg = result.get("message")
            if result.get("code") == 2001:
                print("✗ 积点余额不足,请充值后继续使用")
            elif result.get("code") == 1004:
                print("✗ Token无效,请重新认证")
            else:
                print(f"✗ API调用失败:{error_msg}")
            return {}
            
    except requests.exceptions.Timeout:
        print(f"✗ 请求超时:商品 {asin} 数据获取失败")
        return {}
    except Exception as e:
        print(f"✗ 未知错误:{str(e)}")
        return {}

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        # 获取单个商品详情
        product = client.fetch_product_detail("B0DYTF8L2W")
        
        if product:
            print("\n商品信息:")
            print(f"标题:{product['title']}")
            print(f"价格:{product['price']}")
            print(f"评分:{product['rating']} ({product['review_count']}条评论)")
            print(f"品牌:{product['brand']}")
            print(f"库存:{'有货' if product['in_stock'] else '缺货'}")

这个示例展示了几个关键技术点:首先是parserName参数的使用,它告诉API使用哪个解析模板来提取数据,Pangolin支持多种解析器包括商品详情、关键词搜索、榜单等;其次是bizContext的应用,通过指定邮编可以获取特定地区的价格和库存信息,这对于跨区域价格对比非常有用;最后是错误码的处理,不同的错误码代表不同的问题,需要针对性地处理。

第四步:批量数据采集与并发优化

在实际应用中,我们往往需要采集大量商品的数据,比如监控一个类目下的所有竞品,或者追踪自己店铺的全部SKU。如果使用串行方式逐个调用API,效率会非常低下——假设每个请求耗时10秒,采集1000个商品就需要近3小时。通过并发处理,我们可以将时间缩短到10-15分钟,效率提升10倍以上。

Python提供了多种并发方案,对于IO密集型的API调用场景,使用concurrent.futures模块的ThreadPoolExecutor是最佳选择。它提供了简洁的接口,自动管理线程池,并且支持异常处理和结果收集。让我们实现一个批量采集函数:

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
import time

def batch_fetch_products(self, asin_list: List[str], max_workers: int = 5) -> List[dict]:
    """
    批量获取商品详情(并发版本)
    
    Args:
        asin_list: ASIN列表
        max_workers: 最大并发线程数(建议5-10)
        
    Returns:
        List[dict]: 商品详情列表
    """
    results = []
    failed_asins = []
    
    print(f"开始批量采集 {len(asin_list)} 个商品...")
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_asin = {
            executor.submit(self.fetch_product_detail, asin): asin 
            for asin in asin_list
        }
        
        # 收集结果
        for future in as_completed(future_to_asin):
            asin = future_to_asin[future]
            try:
                product_data = future.result()
                if product_data:
                    results.append(product_data)
                else:
                    failed_asins.append(asin)
            except Exception as e:
                print(f"✗ 处理 {asin} 时发生异常:{str(e)}")
                failed_asins.append(asin)
    
    elapsed_time = time.time() - start_time
    
    print(f"\n批量采集完成!")
    print(f"成功:{len(results)} 个")
    print(f"失败:{len(failed_asins)} 个")
    print(f"耗时:{elapsed_time:.2f} 秒")
    print(f"平均速度:{len(asin_list)/elapsed_time:.2f} 个/秒")
    
    if failed_asins:
        print(f"失败的ASIN:{', '.join(failed_asins)}")
    
    return results

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        # 准备要采集的ASIN列表
        asin_list = [
            "B0DYTF8L2W",
            "B08N5WRWNW",
            "B07ZPKN6YR",
            "B08L5VFJ2R",
            "B09JQMJHXY"
        ]
        
        # 批量采集
        products = client.batch_fetch_products(asin_list, max_workers=5)
        
        # 保存到CSV文件
        if products:
            import pandas as pd
            df = pd.DataFrame(products)
            df.to_csv("products.csv", index=False, encoding="utf-8-sig")
            print(f"\n数据已保存到 products.csv")

这段代码的关键在于max_workers参数的设置。虽然理论上线程数越多速度越快,但实际上受限于API服务器的处理能力和你的网络带宽,过多的并发请求可能导致超时或被限流。根据实测,5-10个并发线程是最佳平衡点,既能充分利用并发优势,又不会给服务器造成过大压力。如果你有大量数据需要采集(比如上万个ASIN),建议分批处理,每批1000个左右,批次之间间隔几秒钟。

第五步:完善的错误处理与重试机制

在生产环境中,网络波动、服务器临时故障、数据格式异常等问题不可避免,一个健壮的系统必须具备完善的错误处理和自动重试能力。Pangolin API返回的错误码非常详细,包括积点不足(2001)、Token无效(1004)、账户过期(2007)、爬取失败(10000/10001)等,我们需要针对不同的错误类型采取不同的处理策略。

让我们实现一个带有指数退避重试机制的API调用装饰器:

import functools
import time
from typing import Callable, Any

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
):
    """
    带指数退避的重试装饰器
    
    Args:
        max_retries: 最大重试次数
        base_delay: 基础延迟时间(秒)
        max_delay: 最大延迟时间(秒)
        exponential_base: 指数基数
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            retries = 0
            
            while retries <= max_retries:
                try:
                    return func(*args, **kwargs)
                except requests.exceptions.Timeout:
                    retries += 1
                    if retries > max_retries:
                        print(f"✗ 超过最大重试次数 ({max_retries}),放弃请求")
                        raise
                    
                    # 计算延迟时间(指数退避)
                    delay = min(
                        base_delay * (exponential_base ** (retries - 1)),
                        max_delay
                    )
                    
                    print(f"⟳ 请求超时,{delay:.1f}秒后进行第{retries}次重试...")
                    time.sleep(delay)
                    
                except requests.exceptions.RequestException as e:
                    # 对于非超时的网络错误,不重试
                    print(f"✗ 网络错误:{str(e)}")
                    raise
                    
        return wrapper
    return decorator

class PangolinAPIClientWithRetry(PangolinAPIClient):
    """带重试机制的API客户端"""
    
    @retry_with_backoff(max_retries=3, base_delay=2.0)
    def fetch_product_detail_with_retry(self, asin: str, zipcode: str = "10041") -> dict:
        """
        获取商品详情(带重试)
        
        这个方法会在遇到超时错误时自动重试,
        重试间隔采用指数退避策略:2秒、4秒、8秒
        """
        return self.fetch_product_detail(asin, zipcode)
    
    def handle_api_error(self, error_code: int, error_message: str) -> str:
        """
        统一的错误处理函数
        
        Args:
            error_code: API返回的错误码
            error_message: 错误信息
            
        Returns:
            str: 用户友好的错误提示
        """
        error_handlers = {
            0: "操作成功",
            1004: "Token无效,请重新登录认证",
            2001: "积点余额不足,请前往控制台充值",
            2007: "账户已过期,请联系客服续费",
            10000: "数据爬取失败,请稍后重试",
            10001: "数据爬取失败,目标页面可能不存在",
            404: "URL地址错误,请检查链接格式"
        }
        
        user_message = error_handlers.get(
            error_code,
            f"未知错误 (代码: {error_code})"
        )
        
        # 记录详细错误日志
        self.log_error(error_code, error_message)
        
        return user_message
    
    def log_error(self, error_code: int, error_message: str):
        """记录错误日志到文件"""
        import datetime
        
        log_entry = {
            "timestamp": datetime.datetime.now().isoformat(),
            "error_code": error_code,
            "error_message": error_message
        }
        
        # 这里可以扩展为写入日志文件或发送告警
        print(f"[ERROR LOG] {log_entry}")

指数退避策略的核心思想是:第一次重试等待时间较短,如果仍然失败,则逐次增加等待时间,这样既能快速恢复临时性故障,又能避免在服务器压力大时雪上加霜。在实际使用中,你可以根据业务特点调整重试参数,比如对于实时性要求高的场景可以减少重试次数,对于批量后台任务可以增加重试次数和延迟时间。

实战项目一:Best Seller榜单实时监控系统

让我们将前面学到的知识整合起来,构建一个真实的应用场景:监控亚马逊Best Seller榜单的变化。这个系统可以帮助卖家及时发现热门产品趋势,分析竞品动态,为选品和运营决策提供数据支持。我们将实现以下功能:定时采集榜单数据、对比排名变化、识别新上榜产品、生成变化报告。

import json
import time
from datetime import datetime
from typing import List, Dict

class BestSellerMonitor:
    """Best Seller榜单监控器"""
    
    def __init__(self, api_client: PangolinAPIClient):
        self.client = api_client
        self.history_file = "data/bestseller_history.json"
        self.load_history()
    
    def load_history(self):
        """加载历史数据"""
        try:
            with open(self.history_file, 'r', encoding='utf-8') as f:
                self.history = json.load(f)
        except FileNotFoundError:
            self.history = {}
    
    def save_history(self):
        """保存历史数据"""
        with open(self.history_file, 'w', encoding='utf-8') as f:
            json.dump(self.history, f, ensure_ascii=False, indent=2)
    
    def fetch_bestseller_list(self, category_url: str) -> List[Dict]:
        """
        获取Best Seller榜单
        
        Args:
            category_url: 类目URL
            
        Returns:
            List[Dict]: 榜单商品列表
        """
        scrape_url = f"{self.client.base_url}/api/v1/scrape"
        
        payload = {
            "url": category_url,
            "parserName": "amzBestSellers",
            "format": "json",
            "bizContext": {
                "zipcode": "10041"
            }
        }
        
        try:
            response = requests.post(
                scrape_url,
                json=payload,
                headers=self.client.get_headers(),
                timeout=30
            )
            
            response.raise_for_status()
            result = response.json()
            
            if result.get("code") == 0:
                data = result.get("data", {})
                json_data = data.get("json", [{}])[0]
                
                if json_data.get("code") == 0:
                    products = json_data.get("data", {}).get("results", [])
                    print(f"✓ 成功获取 {len(products)} 个榜单商品")
                    return products
            
            return []
            
        except Exception as e:
            print(f"✗ 获取榜单失败:{str(e)}")
            return []
    
    def compare_rankings(self, current_data: List[Dict], previous_data: List[Dict]) -> Dict:
        """
        对比排名变化
        
        Args:
            current_data: 当前榜单数据
            previous_data: 历史榜单数据
            
        Returns:
            Dict: 变化分析结果
        """
        changes = {
            "new_entries": [],      # 新上榜
            "rank_up": [],          # 排名上升
            "rank_down": [],        # 排名下降
            "dropped_out": []       # 跌出榜单
        }
        
        # 构建ASIN到排名的映射
        current_ranks = {item['asin']: idx + 1 for idx, item in enumerate(current_data)}
        previous_ranks = {item['asin']: idx + 1 for idx, item in enumerate(previous_data)}
        
        # 识别新上榜商品
        for asin in current_ranks:
            if asin not in previous_ranks:
                product = next(p for p in current_data if p['asin'] == asin)
                changes["new_entries"].append({
                    "asin": asin,
                    "title": product.get("title"),
                    "rank": current_ranks[asin]
                })
        
        # 识别排名变化
        for asin in current_ranks:
            if asin in previous_ranks:
                rank_change = previous_ranks[asin] - current_ranks[asin]
                
                if rank_change > 0:  # 排名上升
                    product = next(p for p in current_data if p['asin'] == asin)
                    changes["rank_up"].append({
                        "asin": asin,
                        "title": product.get("title"),
                        "from_rank": previous_ranks[asin],
                        "to_rank": current_ranks[asin],
                        "change": rank_change
                    })
                elif rank_change < 0:  # 排名下降
                    product = next(p for p in current_data if p['asin'] == asin)
                    changes["rank_down"].append({
                        "asin": asin,
                        "title": product.get("title"),
                        "from_rank": previous_ranks[asin],
                        "to_rank": current_ranks[asin],
                        "change": abs(rank_change)
                    })
        
        # 识别跌出榜单的商品
        for asin in previous_ranks:
            if asin not in current_ranks:
                product = next(p for p in previous_data if p['asin'] == asin)
                changes["dropped_out"].append({
                    "asin": asin,
                    "title": product.get("title"),
                    "previous_rank": previous_ranks[asin]
                })
        
        return changes
    
    def generate_report(self, changes: Dict) -> str:
        """生成变化报告"""
        report = []
        report.append("=" * 60)
        report.append(f"Best Seller榜单变化报告")
        report.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("=" * 60)
        
        if changes["new_entries"]:
            report.append(f"\n🆕 新上榜商品 ({len(changes['new_entries'])}个):")
            for item in changes["new_entries"][:5]:  # 只显示前5个
                report.append(f"  #{item['rank']} - {item['title'][:50]}...")
        
        if changes["rank_up"]:
            report.append(f"\n📈 排名上升 ({len(changes['rank_up'])}个):")
            for item in sorted(changes["rank_up"], key=lambda x: x['change'], reverse=True)[:5]:
                report.append(f"  {item['title'][:50]}...")
                report.append(f"     {item['from_rank']} → {item['to_rank']} (↑{item['change']})")
        
        if changes["rank_down"]:
            report.append(f"\n📉 排名下降 ({len(changes['rank_down'])}个):")
            for item in sorted(changes["rank_down"], key=lambda x: x['change'], reverse=True)[:5]:
                report.append(f"  {item['title'][:50]}...")
                report.append(f"     {item['from_rank']} → {item['to_rank']} (↓{item['change']})")
        
        if changes["dropped_out"]:
            report.append(f"\n❌ 跌出榜单 ({len(changes['dropped_out'])}个):")
            for item in changes["dropped_out"][:5]:
                report.append(f"  {item['title'][:50]}... (原排名#{item['previous_rank']})")
        
        return "\n".join(report)
    
    def run_monitor(self, category_url: str, interval_minutes: int = 60):
        """
        运行监控任务
        
        Args:
            category_url: 要监控的类目URL
            interval_minutes: 检查间隔(分钟)
        """
        print(f"开始监控Best Seller榜单...")
        print(f"检查间隔:{interval_minutes}分钟")
        
        while True:
            try:
                # 获取当前榜单
                current_data = self.fetch_bestseller_list(category_url)
                
                if current_data:
                    # 获取历史数据
                    category_key = category_url.split('/')[-1]
                    previous_data = self.history.get(category_key, [])
                    
                    if previous_data:
                        # 对比变化
                        changes = self.compare_rankings(current_data, previous_data)
                        
                        # 生成报告
                        report = self.generate_report(changes)
                        print(report)
                        
                        # 保存报告到文件
                        with open(f"data/report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", 'w', encoding='utf-8') as f:
                            f.write(report)
                    
                    # 更新历史数据
                    self.history[category_key] = current_data
                    self.save_history()
                
                # 等待下次检查
                print(f"\n下次检查时间:{(datetime.now() + timedelta(minutes=interval_minutes)).strftime('%H:%M:%S')}")
                time.sleep(interval_minutes * 60)
                
            except KeyboardInterrupt:
                print("\n监控已停止")
                break
            except Exception as e:
                print(f"✗ 监控过程发生错误:{str(e)}")
                time.sleep(60)  # 出错后等待1分钟再继续

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        monitor = BestSellerMonitor(client)
        
        # 监控电子产品类目的Best Seller榜单
        category_url = "https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics"
        monitor.run_monitor(category_url, interval_minutes=60)

这个监控系统展示了如何将API调用、数据处理、变化检测和报告生成整合成一个完整的应用。在实际使用中,你可以进一步扩展功能,比如添加邮件或微信通知、将数据存储到数据库、生成可视化图表等。关键是要保持代码的模块化和可扩展性,这样后续添加新功能时不需要大规模重构。

实战项目二:竞品价格追踪与预警系统

价格是电商竞争的核心要素之一,及时掌握竞品的价格变化对于制定定价策略至关重要。我们将构建一个自动化的价格追踪系统,定期采集竞品价格,检测异常波动,并在价格变化超过阈值时发送预警。这个系统可以帮助卖家快速响应市场变化,避免因价格劣势导致的销量下滑。

import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional

class PriceTracker:
    """竞品价格追踪器"""
    
    def __init__(self, api_client: PangolinAPIClient):
        self.client = api_client
        self.price_history_file = "data/price_history.csv"
        self.load_price_history()
    
    def load_price_history(self):
        """加载价格历史数据"""
        try:
            self.price_df = pd.read_csv(self.price_history_file)
            self.price_df['timestamp'] = pd.to_datetime(self.price_df['timestamp'])
        except FileNotFoundError:
            self.price_df = pd.DataFrame(columns=[
                'timestamp', 'asin', 'title', 'price', 'currency'
            ])
    
    def save_price_history(self):
        """保存价格历史"""
        self.price_df.to_csv(self.price_history_file, index=False)
    
    def track_price(self, asin: str) -> Optional[Dict]:
        """
        追踪单个商品价格
        
        Args:
            asin: 商品ASIN
            
        Returns:
            Optional[Dict]: 价格信息
        """
        product = self.client.fetch_product_detail(asin)
        
        if product and product.get('price'):
            price_str = product['price']
            
            # 解析价格(处理$符号和逗号)
            try:
                price_value = float(price_str.replace('$', '').replace(',', ''))
                
                price_record = {
                    'timestamp': datetime.now(),
                    'asin': asin,
                    'title': product.get('title', ''),
                    'price': price_value,
                    'currency': 'USD'
                }
                
                # 添加到历史记录
                self.price_df = pd.concat([
                    self.price_df,
                    pd.DataFrame([price_record])
                ], ignore_index=True)
                
                return price_record
                
            except ValueError:
                print(f"✗ 价格解析失败:{price_str}")
                return None
        
        return None
    
    def batch_track_prices(self, asin_list: List[str]) -> List[Dict]:
        """批量追踪价格"""
        results = []
        
        for asin in asin_list:
            price_record = self.track_price(asin)
            if price_record:
                results.append(price_record)
            time.sleep(1)  # 避免请求过快
        
        self.save_price_history()
        return results
    
    def detect_price_changes(self, asin: str, threshold_percent: float = 5.0) -> Dict:
        """
        检测价格变化
        
        Args:
            asin: 商品ASIN
            threshold_percent: 变化阈值(百分比)
            
        Returns:
            Dict: 变化分析结果
        """
        # 获取该商品的历史价格
        product_prices = self.price_df[self.price_df['asin'] == asin].sort_values('timestamp')
        
        if len(product_prices) < 2:
            return {"status": "insufficient_data"}
        
        latest_price = product_prices.iloc[-1]['price']
        previous_price = product_prices.iloc[-2]['price']
        
        price_change = latest_price - previous_price
        price_change_percent = (price_change / previous_price) * 100
        
        result = {
            "asin": asin,
            "title": product_prices.iloc[-1]['title'],
            "current_price": latest_price,
            "previous_price": previous_price,
            "change_amount": price_change,
            "change_percent": price_change_percent,
            "status": "normal"
        }
        
        if abs(price_change_percent) >= threshold_percent:
            result["status"] = "alert"
            result["alert_type"] = "price_increase" if price_change > 0 else "price_decrease"
        
        return result
    
    def generate_price_report(self, asin_list: List[str]) -> str:
        """生成价格报告"""
        report = []
        report.append("=" * 60)
        report.append("竞品价格追踪报告")
        report.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("=" * 60)
        
        for asin in asin_list:
            change_info = self.detect_price_changes(asin, threshold_percent=5.0)
            
            if change_info.get("status") == "insufficient_data":
                continue
            
            report.append(f"\n商品:{change_info['title'][:50]}...")
            report.append(f"ASIN:{asin}")
            report.append(f"当前价格:${change_info['current_price']:.2f}")
            report.append(f"上次价格:${change_info['previous_price']:.2f}")
            
            if change_info['status'] == "alert":
                symbol = "📈" if change_info['alert_type'] == "price_increase" else "📉"
                report.append(f"{symbol} 价格变化:{change_info['change_percent']:+.2f}% (${change_info['change_amount']:+.2f})")
                report.append("⚠️  已触发价格预警!")
            else:
                report.append(f"价格变化:{change_info['change_percent']:+.2f}% (${change_info['change_amount']:+.2f})")
        
        return "\n".join(report)
    
    def get_price_trend(self, asin: str, days: int = 30) -> pd.DataFrame:
        """
        获取价格趋势数据
        
        Args:
            asin: 商品ASIN
            days: 天数
            
        Returns:
            pd.DataFrame: 价格趋势数据
        """
        cutoff_date = datetime.now() - timedelta(days=days)
        
        trend_data = self.price_df[
            (self.price_df['asin'] == asin) &
            (self.price_df['timestamp'] >= cutoff_date)
        ].sort_values('timestamp')
        
        return trend_data

# 使用示例
if __name__ == "__main__":
    client = PangolinAPIClient("[email protected]", "your_password")
    
    if client.authenticate():
        tracker = PriceTracker(client)
        
        # 定义要追踪的竞品ASIN列表
        competitor_asins = [
            "B08N5WRWNW",
            "B07ZPKN6YR",
            "B08L5VFJ2R"
        ]
        
        # 批量追踪价格
        print("开始追踪竞品价格...")
        tracker.batch_track_prices(competitor_asins)
        
        # 生成报告
        report = tracker.generate_price_report(competitor_asins)
        print(report)
        
        # 保存报告
        with open(f"data/price_report_{datetime.now().strftime('%Y%m%d')}.txt", 'w', encoding='utf-8') as f:
            f.write(report)

这个价格追踪系统的核心价值在于自动化和及时性。通过定时任务(可以使用cron或Windows任务计划程序)每天运行一次,你就能建立起完整的价格历史数据库,为定价决策提供数据支撑。进一步地,你可以结合机器学习算法预测价格趋势,或者根据竞品价格自动调整自己的定价策略,实现真正的智能化运营。

总结与最佳实践建议

通过本文的7个完整代码示例,我们系统地学习了Python调用Pangolin API的全部技能,从基础的环境配置、API认证,到高级的并发处理、错误重试,再到实战的监控系统和追踪系统。这些知识不仅适用于Pangolin API,也是开发任何API集成项目的通用方法论。

在实际应用中,有几个关键的最佳实践需要特别注意:首先是安全性,永远不要将API密钥硬编码在代码中,使用环境变量或加密的配置文件;其次是稳定性,完善的错误处理和重试机制是生产环境的必备要素;第三是性能,合理使用并发和缓存可以大幅提升系统效率;最后是可维护性,模块化的代码结构和清晰的文档注释能让项目长期健康发展。

如果你现在就想开始实践,建议按照以下步骤进行:第一,访问Pangolinfo官网注册账号并获取API密钥;第二,使用本文提供的代码示例搭建基础框架;第三,根据你的实际需求定制功能模块;第四,在小规模测试环境中验证系统稳定性;第五,逐步扩展到生产环境。记住,任何复杂的系统都是从简单的MVP开始迭代而来的,不要试图一次性实现所有功能,而是要持续优化和改进。

更多技术细节和API参数说明,请参考Pangolinfo API官方文档。如果在开发过程中遇到问题,可以访问开发者控制台查看API调用日志和积点使用情况。祝你开发顺利,用数据驱动业务增长!

立即开始Python API开发之旅 → 访问 Pangolinfo Scrape API 获取免费试用额度,或查看 完整API文档 了解更多技术细节。让Python代码为你的数据采集赋能!

解决方案

为电商场景打造的高可用数据采集 API,自动规避 IP 封禁、验证码拦截、代理故障等爬虫难题,无需复杂配置即可快速获取精准、稳定的电商数据。

AMZ Data Tracker 是亚马逊卖家专属的全方位运营工具,集关键词调研、竞品销量追踪、Listing 优化、恶意跟卖与差评监控于一体,助力卖家数据化决策,高效提升店铺销量与排名。

每周教程

准备好开始您的数据采集之旅了吗?

注册免费账户,立即体验强大的网页数据采集API,无需信用卡。

微信扫一扫
与我们联系

QR Code
快速测试

联系我们,您的问题,我们随时倾听

无论您在使用 Pangolin 产品的过程中遇到任何问题,或有任何需求与建议,我们都在这里为您提供支持。请填写以下信息,我们的团队将尽快与您联系,确保您获得最佳的产品体验。

Talk to our team

If you encounter any issues while using Pangolin products, please fill out the following information, and our team will contact you as soon as possible to ensure you have the best product experience.