Pangolinfo API 亚马逊数据采集完整指南:从接入到企业级集成

Pangolinfo
2026-05-25

Pangolinfo API 是什么?它能帮你解决什么问题?

Pangolinfo API 是一套面向电商数据的 enterprise-grade 数据采集接口,专为需要大规模、高时效性获取亚马逊平台公开数据的企业和开发者设计。通过标准化的 RESTful 接口,用户可以按需获取商品详情、关键词搜索结果、各类销售榜单、用户评论、SP广告位、类目节点等核心数据,覆盖亚马逊全球15个主要站点。

与传统自建爬虫方案相比,Pangolinfo API 将分布式采集、智能解析、反爬对抗、代理调度等复杂基础设施全部封装在服务端,开发者只需关注业务逻辑和数据应用。据平台统计,使用 Pangolinfo API 的企业平均将数据采集相关的工程投入降低了78%,同时将数据获取的稳定性从自建方案的82%提升至99.5%以上。

本文是一篇面向技术开发者的完整接入指南,将从认证授权、同步/异步调用方式、各类数据采集场景、返回数据结构解析、企业系统集成到 AI Agent 应用,提供可运行的代码示例和经过生产环境验证的最佳实践。

Pangolinfo API 体系全景:你需要了解的核心能力

在深入技术细节之前,先对 Pangolinfo 的数据产品体系建立整体认知。目前面向亚马逊数据采集的核心产品包括四大API和一项 Agent 技能:

核心 API 产品矩阵

API 产品核心能力典型应用场景数据时效
Scrape API通用电商页面采集,支持商品详情、搜索页、榜单页选品分析、竞品监控、价格追踪分钟级
Reviews Scraper API专业评论采集,支持全部评论、图片评论、视频评论用户洞察、差评分析、产品改进小时级
AI Overview SERP APIGoogle AI Overview 搜索结果采集SEO策略、竞品搜索表现分析实时
AMZ Data Tracker可视化数据监控与追踪面板运营监控、趋势分析、告警通知定时同步

数据覆盖范围

Pangolinfo API 当前支持亚马逊美国、加拿大、墨西哥、英国、德国、法国、意大利、西班牙、荷兰、瑞典、波兰、日本、澳大利亚、阿联酋、新加坡共15个站点。数据类型涵盖:

商品数据:标题、品牌、价格(当前价/历史价)、库存状态、配送方式、卖家信息、商品变体(颜色/尺寸/规格)、商品图片URL、A+内容、五点描述、技术参数。

排名与榜单数据:Best Sellers(热卖榜)、New Releases(新品榜)、Movers & Shakers(飙升榜)、Most Wished For(心愿单榜)、Gift Ideas(礼品榜)的实时排名、BSR(Best Sellers Rank)及类目路径。

评论数据:评论标题、内容、星级、评论者信息、 Vine 标识、图片/视频评论、 helpful 投票数、评论时间。

广告数据:SP(Sponsored Products)广告位商品信息、广告文案、投放位置(顶部/中部/底部/详情页)。

搜索数据:关键词搜索结果页的商品列表、广告位分布、自然排名与广告排名关系、搜索建议(Autocomplete)。

输出格式选择

所有 API 支持三种输出格式,通过请求参数 output_format 控制:

json(默认):结构化JSON,字段已解析,推荐用于大多数应用场景。

html:原始页面HTML,适合需要自行解析或存档的场景。

markdown:Markdown格式,便于内容处理和LLM输入。

接入准备:认证、额度与基础配置

第一步:获取 API Key

访问 Pangolinfo 控制台 注册账号并创建项目。在项目设置页面的 “API Keys” 标签下,点击 “Generate New Key” 即可获取专属的 API Key。建议为不同环境(开发/测试/生产)分别创建独立的 Key,便于权限管理和用量追踪。

API Key 的格式为 pgo_xxxxxxxxxxxxxxxx,需要在每个请求的 Header 中通过 Authorization: Bearer YOUR_API_KEY 传递。

第二步:了解额度与计费

Pangolinfo API 采用按量计费模式,计费单元为 “Page Credit”,即每成功采集一个页面消耗1个 Credit。不同数据类型的页面消耗如下:

数据类型每请求消耗 Credit说明
商品详情页1单个ASIN的商品详情
搜索列表页1每页通常包含20-24个商品
榜单页1每页50个商品
评论页1每页10条评论
广告位采集2包含广告位解析,消耗略高

新注册用户可获得一定数量的免费 Credit 用于测试。在生产环境中,建议根据业务量级选择合适的套餐,并开启用量告警。

第三步:基础 SDK 安装(可选)

虽然可以直接使用 HTTP 客户端调用 REST API,但 Pangolinfo 提供了官方 Python SDK 以简化开发:

pip install pangolinfo-api

SDK 封装了认证、重试、错误处理、分页等常用逻辑,推荐在生产环境中使用。本文的代码示例将同时展示原生 HTTP 请求和 SDK 调用两种方式。

同步接入方式详解:REST API 实时调用

同步接入是 Pangolinfo API 最直接的使用方式。客户端发送 HTTP 请求后,服务端实时执行采集并返回结果。这种方式适合对延迟敏感、数据量相对较小的场景,如单次商品查询、单关键词搜索结果获取等。

基础请求结构

所有同步接口的基地址为 https://api.pangolinfo.com/v1,采用 POST 方法,Content-Type 为 application/json。通用请求头如下:

POST /v1/amazon/scrape HTTP/1.1
Host: api.pangolinfo.com
Authorization: Bearer pgo_your_api_key_here
Content-Type: application/json

场景一:采集商品详情(同步)

商品详情采集是最基础也最常用的接口。通过传入站点和 ASIN 列表,即可获取完整的商品信息。

import requests
import json

API_KEY = "pgo_your_api_key_here"
BASE_URL = "https://api.pangolinfo.com/v1"

def get_product_details(site: str, asins: list):
    """同步获取商品详情"""
    url = f"{BASE_URL}/amazon/product"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "site": site,
        "asins": asins,
        "output_format": "json",
        "include_variants": True,
        "include_bsr": True,
        "include_seller_info": True
    }
    
    response = requests.post(url, headers=headers, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

# 获取3个商品的详情
result = get_product_details(
    site="amazon.com",
    asins=["B08N5WRWNW", "B08N5M7S6K", "B09V3KXJPB"]
)

for product in result.get("products", []):
    print(f"ASIN: {product['asin']}")
    print(f"标题: {product['title']}")
    print(f"价格: {product['price']['current']}")
    print(f"评分: {product['rating']['average']} ({product['rating']['count']} reviews)")
    print(f"BSR: {product['bsr']['rank']} in {product['bsr']['category']}")
    print("-" * 50)

返回数据结构(商品详情)

{
  "status": "success",
  "request_id": "req_abc123def456",
  "products": [
    {
      "asin": "B08N5WRWNW",
      "site": "amazon.com",
      "url": "https://www.amazon.com/dp/B08N5WRWNW",
      "title": "Apple AirPods (3rd Generation) with MagSafe Charging Case",
      "brand": "Apple",
      "price": {
        "current": 149.99,
        "currency": "USD",
        "list_price": 179.00,
        "savings_amount": 29.01,
        "savings_percent": 16
      },
      "rating": {
        "average": 4.5,
        "count": 84532,
        "five_star_percent": 72,
        "distribution": {
          "5_star": 60863,
          "4_star": 16906,
          "3_star": 4227,
          "2_star": 1268,
          "1_star": 1268
        }
      },
      "bsr": {
        "rank": 42,
        "category": "Electronics",
        "sub_category": "Earbud & In-Ear Headphones",
        "bsr_path": [
          {"node_id": "172282", "name": "Electronics"},
          {"node_id": "172541", "name": "Headphones"},
          {"node_id": "12097478011", "name": "Earbud & In-Ear Headphones"}
        ]
      },
      "images": {
        "main": "https://m.media-amazon.com/images/...",
        "gallery": [
          "https://m.media-amazon.com/images/...",
          "https://m.media-amazon.com/images/..."
        ]
      },
      "features": [
        "Spatial audio with dynamic head tracking",
        "Adaptive EQ automatically tunes music to your ears",
        "Force sensor lets you control your entertainment"
      ],
      "description": "Apple AirPods (3rd generation)...",
      "variants": [
        {
          "asin": "B08N5WRWNW",
          "attribute": "Color",
          "value": "White",
          "available": true
        }
      ],
      "seller": {
        "name": "Amazon.com",
        "is_amazon": true,
        "fulfillment": "FBA"
      },
      "availability": {
        "status": "In Stock",
        "message": "In Stock."
      },
      "dimensions": {
        "shipping_weight": "0.2 pounds",
        "product_dimensions": "2.11 x 1.83 x 0.84 inches"
      },
      "collected_at": "2026-05-25T08:30:00Z"
    }
  ],
  "credits_used": 3,
  "credits_remaining": 997
}

场景二:关键词搜索数据采集(同步)

关键词搜索接口用于获取亚马逊搜索结果页的商品列表,支持指定页码、排序方式和过滤条件。

def search_keywords(site: str, keyword: str, pages: int = 1):
    """同步关键词搜索采集"""
    url = f"{BASE_URL}/amazon/search"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    all_results = []
    
    for page in range(1, pages + 1):
        payload = {
            "site": site,
            "keyword": keyword,
            "page": page,
            "sort_by": "relevance",
            "output_format": "json",
            "include_sponsored": True,
            "include_bsr": True
        }
        
        response = requests.post(url, headers=headers, json=payload, timeout=45)
        response.raise_for_status()
        data = response.json()
        
        all_results.extend(data.get("products", []))
        
        # 打印广告位分布
        sponsored = [p for p in data.get("products", []) if p.get("is_sponsored")]
        print(f"第{page}页: 自然结果{len(data.get('products', [])) - len(sponsored)}个, 广告{sponsored}个")
    
    return all_results

# 搜索"wireless earbuds"前2页
products = search_keywords("amazon.com", "wireless earbuds", pages=2)
print(f"共获取 {len(products)} 个商品")

场景三:榜单数据采集(同步)

榜单接口支持获取各类亚马逊销售榜单的商品排名数据。

def get_best_sellers(site: str, node_id: str, pages: int = 1):
    """同步获取 Best Sellers 榜单"""
    url = f"{BASE_URL}/amazon/bestsellers"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "site": site,
        "node_id": node_id,
        "pages": pages,
        "output_format": "json",
        "include_trend": True
    }
    
    response = requests.post(url, headers=headers, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

# 获取 Electronics 类目 Best Sellers(node_id: 172282)
result = get_best_sellers("amazon.com", "172282", pages=2)

for item in result.get("products", []):
    trend_icon = "🔥" if item.get("trend") == "up" else "📉" if item.get("trend") == "down" else "➡️"
    print(f"#{item['rank']} {trend_icon} {item['title'][:60]}...")
    print(f"   ASIN: {item['asin']} | 价格: ${item['price']['current']} | 评分: {item['rating']['average']}")

场景四:评论数据采集(同步)

def get_reviews(site: str, asin: str, pages: int = 3):
    """同步获取商品评论"""
    url = f"{BASE_URL}/amazon/reviews"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "site": site,
        "asin": asin,
        "pages": pages,
        "sort_by": "recent",
        "filter": "all",
        "output_format": "json",
        "include_images": True
    }
    
    response = requests.post(url, headers=headers, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

# 获取某商品的评论
reviews = get_reviews("amazon.com", "B08N5WRWNW", pages=2)

for review in reviews.get("reviews", [])[:5]:
    print(f"⭐ {review['rating']}/5 | {review['title']}")
    print(f"   {review['content'][:100]}...")
    print(f"   by {review['author']} | {review['date']} | Helpful: {review.get('helpful_votes', 0)}")

同步接口的性能特征

同步接口的平均响应延迟为3-15秒,具体取决于目标站点的响应速度和页面复杂度。超时时间建议设置为30-45秒。单请求最多支持50个ASIN或5页搜索结果。对于更大的批量需求,建议使用异步接口。

异步接入方式详解:大规模批量采集的最佳实践

当你的业务需要一次性采集数千个商品、数百个关键词或连续监控多个榜单时,同步接口的串行等待模式会成为性能瓶颈。Pangolinfo API 提供的异步任务机制,允许你一次性提交大量任务,由服务端分布式集群并行处理,完成后通过 Webhook 回调或主动轮询获取结果。

异步采集的工作流程

异步模式遵循”提交-处理-回调”的三阶段模型:

阶段一:任务提交。客户端向任务提交端点发送请求,服务端立即返回任务ID,不等待采集完成。

阶段二:分布式处理。服务端将任务分发到全球多个采集节点并行执行,自动处理重试、代理轮换、反爬对抗。

阶段三:结果交付。任务完成后,服务端通过配置的 Webhook URL 推送结果,或客户端通过任务查询接口主动拉取。

提交异步任务

def submit_async_task(task_type: str, tasks: list):
    """提交异步采集任务"""
    url = f"{BASE_URL}/amazon/tasks"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "task_type": task_type,
        "tasks": tasks,
        "webhook_url": "https://your-domain.com/webhook/pangolinfo",
        "webhook_secret": "your_webhook_secret_for_signature_verification",
        "priority": "normal",
        "output_format": "json"
    }
    
    response = requests.post(url, headers=headers, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

# 示例:批量提交100个商品详情采集任务
asins = ["B08N5WRWNW", "B08N5M7S6K", "B09V3KXJPB", ...]  # 100个ASIN

tasks = [
    {"site": "amazon.com", "asin": asin}
    for asin in asins
]

result = submit_async_task("product_details", tasks)
print(f"任务批次ID: {result['batch_id']}")
print(f"任务总数: {result['task_count']}")
print(f"预计完成时间: {result['estimated_completion']}")

接收 Webhook 回调

Webhook 是异步结果交付的推荐方式。服务端会在任务完成(或失败)时向你的端点发送 HTTP POST 请求。

# Flask Webhook 接收示例
from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret"

@app.route('/webhook/pangolinfo', methods=['POST'])
def handle_pangolinfo_webhook():
    # 验证签名
    signature = request.headers.get('X-Pangolinfo-Signature')
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        request.data,
        hashlib.sha256
    ).hexdigest()
    
    if not hmac.compare_digest(signature, expected):
        return jsonify({"error": "Invalid signature"}), 401
    
    data = request.json
    
    if data['event'] == 'task.completed':
        task_id = data['task_id']
        result = data['result']
        
        # 将结果存入数据库
        save_to_database(task_id, result)
        
        print(f"任务 {task_id} 完成,数据已保存")
        
    elif data['event'] == 'task.failed':
        task_id = data['task_id']
        error = data['error']
        
        # 记录失败,触发告警或重试
        log_failure(task_id, error)
        
        print(f"任务 {task_id} 失败: {error}")
    
    return jsonify({"status": "ok"}), 200

if __name__ == '__main__':
    app.run(port=5000)

主动轮询任务状态

如果无法提供 Webhook 端点,也可以通过轮询接口查询任务状态。

def check_task_status(batch_id: str):
    """查询异步任务状态"""
    url = f"{BASE_URL}/amazon/tasks/{batch_id}"
    headers = {"Authorization": f"Bearer {API_KEY}"}
    
    response = requests.get(url, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

# 轮询直到任务完成
import time

batch_id = "batch_abc123"
while True:
    status = check_task_status(batch_id)
    
    print(f"进度: {status['completed']}/{status['total']} 完成, "
          f"{status['failed']} 失败, {status['pending']} 待处理")
    
    if status['status'] in ['completed', 'failed']:
        # 获取完整结果
        results = status.get('results', [])
        print(f"所有任务已结束,共获取 {len(results)} 条结果")
        break
    
    time.sleep(10)  # 每10秒轮询一次

异步 vs 同步:如何选择?

对比维度同步接口异步接口
响应延迟3-15秒立即返回任务ID
单次最大任务量50个ASIN / 5页10,000个任务/批次
结果获取方式HTTP响应体Webhook回调 / 轮询
适用场景实时查询、小批量大规模批量、定时任务
失败重试客户端自行处理服务端自动重试3次
并发限制100 req/min无硬性限制

高级请求场景:广告位、类目节点与多维度数据

场景五:SP广告位数据采集

亚马逊SP广告位数据的采集对竞品广告策略分析至关重要。Pangolinfo API 的 SP广告位采集率达到98%,为行业最高水平。

def get_sponsored_products(site: str, keyword: str, pages: int = 3):
    """采集关键词搜索结果中的SP广告位"""
    url = f"{BASE_URL}/amazon/sponsored"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "site": site,
        "keyword": keyword,
        "pages": pages,
        "ad_types": ["sponsored_products"],
        "output_format": "json"
    }
    
    response = requests.post(url, headers=headers, json=payload, timeout=45)
    response.raise_for_status()
    return response.json()

# 分析"protein powder"关键词的广告竞争
ads = get_sponsored_products("amazon.com", "protein powder", pages=2)

# 统计广告位分布
positions = {"top": 0, "middle": 0, "bottom": 0, "detail": 0}
brands = {}

for ad in ads.get("ads", []):
    positions[ad['position']] += 1
    brand = ad.get('brand', 'Unknown')
    brands[brand] = brands.get(brand, 0) + 1

print("广告位分布:", positions)
print("主要广告主:", sorted(brands.items(), key=lambda x: x[1], reverse=True)[:10])

返回数据结构(广告位)

{
  "ads": [
    {
      "asin": "B000QSNYGI",
      "title": "Optimum Nutrition Gold Standard Whey Protein Powder",
      "brand": "Optimum Nutrition",
      "price": {"current": 44.99, "currency": "USD"},
      "position": "top",
      "page_number": 1,
      "ad_badge": "Sponsored",
      "rating": {"average": 4.7, "count": 152340},
      "thumbnail": "https://m.media-amazon.com/images/...",
      "seller": "Optimum Nutrition Official"
    }
  ],
  "total_ads": 24,
  "pages_scanned": 2,
  "keyword": "protein powder"
}

场景六:类目节点数据采集

前文项目已详细介绍过类目节点数据的价值,这里展示如何通过 API 获取。

def get_category_tree(site: str, node_id: str, depth: int = 3):
    """获取类目节点层级结构"""
    url = f"{BASE_URL}/amazon/browse-nodes"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "site": site,
        "node_id": node_id,
        "depth": depth,
        "include_product_count": True
    }
    
    response = requests.post(url, headers=headers, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

# 获取 Kitchen & Dining 下3层类目树
tree = get_category_tree("amazon.com", "284507", depth=3)

def print_tree(nodes, level=0):
    for node in nodes:
        indent = "  " * level
        count = node.get("product_count", "N/A")
        print(f"{indent}└─ {node['name']} (ID: {node['node_id']}, Products: {count})")
        if "children" in node:
            print_tree(node["children"], level + 1)

print_tree(tree.get("nodes", []))

场景七:多站点批量对比采集

通过同时请求多个站点,可以快速对比同一商品在不同市场的表现。

def cross_marketplace_compare(asin: str, sites: list):
    """多站点商品数据对比"""
    url = f"{BASE_URL}/amazon/product"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    results = {}
    for site in sites:
        payload = {
            "site": site,
            "asins": [asin],
            "output_format": "json"
        }
        
        try:
            response = requests.post(url, headers=headers, json=payload, timeout=30)
            data = response.json()
            product = data.get("products", [{}])[0]
            
            results[site] = {
                "title": product.get("title", ""),
                "price": product.get("price", {}),
                "rating": product.get("rating", {}),
                "availability": product.get("availability", {}),
                "bsr": product.get("bsr", {})
            }
        except Exception as e:
            results[site] = {"error": str(e)}
    
    return results

# 对比 AirPods 在美/日/德三站的数据
comparison = cross_marketplace_compare(
    "B08N5WRWNW",
    ["amazon.com", "amazon.co.jp", "amazon.de"]
)

for site, data in comparison.items():
    if "error" not in data:
        price = data['price'].get('current', 'N/A')
        rating = data['rating'].get('average', 'N/A')
        print(f"{site}: 价格={price}, 评分={rating}")

企业级集成:将 Pangolinfo API 接入你的数据系统与ERP

对于中大型企业而言,API 采集只是第一步。如何将采集到的数据无缝融入现有的数据仓库、ERP、BI系统或自建平台,是决定数据价值能否充分释放的关键环节。

集成架构模式一:直连数据库写入

最简单的集成方式是在获取 API 结果后,直接写入企业数据库。适合数据量不大、实时性要求中等的场景。

import psycopg2
from datetime import datetime

def save_products_to_db(products: list, db_config: dict):
    """将商品数据写入 PostgreSQL"""
    conn = psycopg2.connect(**db_config)
    cursor = conn.cursor()
    
    insert_sql = """
        INSERT INTO amazon_products 
        (asin, site, title, brand, current_price, currency, rating_avg, 
         rating_count, bsr_rank, bsr_category, collected_at, raw_json)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (asin, site) DO UPDATE SET
            title = EXCLUDED.title,
            current_price = EXCLUDED.current_price,
            rating_avg = EXCLUDED.rating_avg,
            rating_count = EXCLUDED.rating_count,
            bsr_rank = EXCLUDED.bsr_rank,
            collected_at = EXCLUDED.collected_at,
            raw_json = EXCLUDED.raw_json
    """
    
    for product in products:
        cursor.execute(insert_sql, (
            product['asin'],
            product['site'],
            product['title'],
            product.get('brand', ''),
            product.get('price', {}).get('current'),
            product.get('price', {}).get('currency', 'USD'),
            product.get('rating', {}).get('average'),
            product.get('rating', {}).get('count'),
            product.get('bsr', {}).get('rank'),
            product.get('bsr', {}).get('category', ''),
            datetime.now(),
            json.dumps(product)
        ))
    
    conn.commit()
    cursor.close()
    conn.close()
    print(f"成功写入/更新 {len(products)} 条商品数据")

集成架构模式二:消息队列中间件

对于高并发、高可用的生产环境,推荐通过消息队列解耦采集与消费流程。

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def publish_to_queue(channel: str, data: dict):
    """将数据发布到 Redis Stream"""
    redis_client.xadd(channel, {
        "data": json.dumps(data),
        "timestamp": datetime.now().isoformat()
    })

# 在 Webhook 接收端使用
@app.route('/webhook/pangolinfo', methods=['POST'])
def handle_webhook():
    data = request.json
    if data['event'] == 'task.completed':
        # 将结果推入队列,由消费者异步处理
        publish_to_queue("amazon:data:products", data['result'])
    return jsonify({"status": "ok"}), 200

集成架构模式三:数据仓库ETL管道

对于已有数据仓库(如Snowflake、BigQuery、阿里云MaxCompute)的企业,可以构建定时ETL任务。

# 使用 Airflow DAG 示例(伪代码)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def extract_amazon_data(**context):
    """从 Pangolinfo API 抽取数据"""
    # 调用异步接口提交任务
    batch_id = submit_async_task("product_details", task_list)
    # 等待完成并获取结果
    results = wait_for_completion(batch_id)
    return results

def transform_data(**context):
    """数据清洗与转换"""
    raw_data = context['ti'].xcom_pull(task_ids='extract')
    # 清洗、标准化、去重
    return cleaned_data

def load_to_warehouse(**context):
    """加载到数据仓库"""
    data = context['ti'].xcom_pull(task_ids='transform')
    # 批量写入 Snowflake/BigQuery
    bulk_insert(data)

with DAG(
    'amazon_data_pipeline',
    start_date=datetime(2026, 1, 1),
    schedule_interval='0 6 * * *',  # 每天早6点运行
    catchup=False
) as dag:
    
    extract = PythonOperator(task_id='extract', python_callable=extract_amazon_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    load = PythonOperator(task_id='load', python_callable=load_to_warehouse)
    
    extract >> transform >> load

ERP系统对接实战:以跨境电商ERP为例

跨境电商ERP通常需要以下亚马逊数据来支撑运营决策:

选品模块:关键词搜索量、竞品数量、平均评分、价格带分布。

采购模块:商品尺寸重量、变体信息、库存状态。

定价模块:竞品实时价格、历史价格趋势、促销信息。

库存模块:Best Sellers排名变化、销售速度估算。

class AmazonERPConnector:
    """亚马逊数据ERP连接器"""
    
    def __init__(self, api_key: str, erp_db):
        self.api_key = api_key
        self.erp = erp_db
        self.base_url = "https://api.pangolinfo.com/v1"
    
    def sync_product_catalog(self, site: str, category_node: str):
        """同步指定类目的商品目录到ERP"""
        # 1. 获取类目下所有商品
        products = self._fetch_category_products(site, category_node)
        
        # 2. 转换为ERP标准格式
        erp_products = [
            {
                "sku": f"AMZ-{p['asin']}",
                "name": p['title'],
                "category": p['bsr']['category'],
                "cost_price": p['price']['current'] * 0.3,  # 估算成本
                "sale_price": p['price']['current'],
                "weight": self._parse_weight(p['dimensions']),
                "dimensions": p['dimensions']['product_dimensions'],
                "supplier": p['brand'],
                "source_url": p['url'],
                "source_platform": "amazon",
                "competitor_rating": p['rating']['average'],
                "competitor_reviews": p['rating']['count'],
                "last_synced": datetime.now()
            }
            for p in products
        ]
        
        # 3. 批量写入ERP
        self.erp.bulk_upsert_products(erp_products)
        return len(erp_products)
    
    def update_competitive_pricing(self, site: str, sku_list: list):
        """更新竞品价格监控数据"""
        asins = [sku.replace("AMZ-", "") for sku in sku_list]
        
        # 批量获取最新价格
        products = self._fetch_products(site, asins)
        
        for product in products:
            self.erp.update_price_monitor(
                sku=f"AMZ-{product['asin']}",
                competitor_price=product['price']['current'],
                list_price=product['price'].get('list_price'),
                price_change_percent=self._calc_price_change(product),
                collected_at=datetime.now()
            )
    
    def _fetch_category_products(self, site, node_id):
        # 调用 Pangolinfo API 获取类目商品
        pass
    
    def _fetch_products(self, site, asins):
        # 调用 Pangolinfo API 批量获取商品
        pass

AI Agent 集成:让大模型直接调用亚马逊数据能力

大语言模型与外部数据工具的连接正在重塑数据分析的工作流。Pangolinfo Amazon Scraper Skill 基于 MCP(Model Context Protocol)协议,让 AI Agent 能够直接调用亚马逊数据采集能力,无需编写代码即可获取实时电商数据。

MCP 协议简介

MCP 是一种开放标准协议,定义了 AI Agent 与外部工具之间的通信规范。通过 MCP,Agent 可以理解工具的输入参数、调用方式和返回格式,实现自主决策和任务执行。Pangolinfo 是电商数据领域首批支持 MCP 协议的服务商之一。

配置 Pangolinfo Amazon Scraper Skill

在支持 MCP 的 Agent 平台(如 Claude Desktop、OpenClaw、LangChain 等)中,添加以下配置:

{
  "mcpServers": {
    "pangolinfo-amazon": {
      "command": "npx",
      "args": ["-y", "@pangolinfo/amazon-scraper-mcp@latest"],
      "env": {
        "PANGOLINFO_API_KEY": "pgo_your_api_key_here"
      }
    }
  }
}

Agent 自主采集实战案例

配置完成后,Agent 即可通过自然语言指令执行复杂的数据采集任务:

案例一:选品市场调研

用户指令:”帮我调研美国站无线耳机市场的竞争格局,我需要知道:1)搜索结果前3页有多少个品牌;2)价格带分布;3)评论数在1000以上的商品占比。”

Agent 执行流程:

1. 调用 amazon_search 工具,keyword=”wireless earbuds”, pages=3, site=”amazon.com”

2. 解析返回的60个商品数据,提取品牌、价格、评论数字段

3. 统计分析:品牌去重计数、价格区间分布(<50/50-100/100-200/>200)、评论数>=1000的商品比例

4. 生成结构化报告返回给用户

案例二:竞品监控告警

用户指令:”监控ASIN B08N5WRWNW的价格和排名变化,如果价格下降超过10%或BSR排名跌出前100,通知我。”

Agent 执行流程:

1. 调用 amazon_product 工具获取当前数据

2. 查询历史数据(从记忆或外部存储)

3. 计算变化幅度,判断是否触发告警条件

4. 如触发,生成告警消息并推荐应对策略

案例三:批量评论情感分析

用户指令:”获取这5个竞品ASIN最近100条评论,分析用户最满意和最不满意的三个点。”

Agent 执行流程:

1. 调用 amazon_reviews 工具批量获取评论(5个ASIN × 10页)

2. 对500条评论进行主题提取和情感分类

3. 聚类分析:提取高频正面/负面关键词

4. 输出洞察报告:”用户最满意:音质(87%)、续航(72%)、舒适度(65%);最不满意:充电速度(23%)、连接稳定性(19%)、价格(15%)”

Agent 集成的技术实现

如果你正在开发自己的 Agent 平台,可以通过以下方式集成 Pangolinfo 数据能力:

from langchain.tools import BaseTool
from pydantic import BaseModel, Field

class AmazonSearchInput(BaseModel):
    keyword: str = Field(description="Search keyword on Amazon")
    site: str = Field(default="amazon.com", description="Amazon marketplace")
    pages: int = Field(default=1, ge=1, le=5, description="Number of pages to scrape")

class AmazonSearchTool(BaseTool):
    name = "amazon_search"
    description = "Search for products on Amazon and return structured data including titles, prices, ratings, and BSR"
    args_schema = AmazonSearchInput
    
    def _run(self, keyword: str, site: str = "amazon.com", pages: int = 1):
        """同步执行搜索"""
        import requests
        
        response = requests.post(
            "https://api.pangolinfo.com/v1/amazon/search",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={"site": site, "keyword": keyword, "pages": pages}
        )
        return response.json()
    
    async def _arun(self, **kwargs):
        """异步执行(可选)"""
        raise NotImplementedError("Async not implemented")

# 注册工具到 Agent
from langchain.agents import initialize_agent, Tool

tools = [
    Tool(
        name="amazon_search",
        func=AmazonSearchTool()._run,
        description="Search Amazon for products"
    )
]

agent = initialize_agent(
    tools,
    llm,  # 你的大模型实例
    agent="zero-shot-react-description",
    verbose=True
)

# 使用
result = agent.run("Find the top 10 wireless earbuds on Amazon US under $100 with 4.5+ rating")

Agent 场景分析的最佳实践

1. 工具描述要精确。Agent 依赖工具描述来决定是否调用某个工具。描述中应包含清晰的输入参数说明和典型使用场景。

2. 结果格式化。将 API 返回的原始 JSON 转换为 Agent 易读的格式(如 Markdown 表格),可显著提升 Agent 的理解和分析能力。

3. 缓存策略。对于不频繁变化的数据(如商品详情),在 Agent 侧设置短期缓存(如5分钟),减少重复API调用。

4. 错误处理。当 API 调用失败时,向 Agent 返回清晰的错误信息而非原始异常堆栈,帮助 Agent 决定重试或调整策略。

生产环境最佳实践与性能优化

1. 请求去重与缓存

在电商数据场景中,同一商品在短时间内被多次查询的情况非常普遍。建立合理的缓存层可以大幅降低 API 调用成本。

import functools
import redis
from datetime import timedelta

cache = redis.Redis()

def cached_api_call(ttl_seconds: int = 300):
    """API调用结果缓存装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            cache_key = f"pangolinfo:{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            cached = cache.get(cache_key)
            if cached:
                return json.loads(cached)
            
            result = func(*args, **kwargs)
            cache.setex(cache_key, timedelta(seconds=ttl_seconds), json.dumps(result))
            return result
        return wrapper
    return decorator

@cached_api_call(ttl_seconds=600)
def get_product_with_cache(site: str, asin: str):
    """带缓存的商品查询"""
    return get_product_details(site, [asin])

2. 速率限制与退避策略

虽然 Pangolinfo API 的并发限制较为宽松,但生产环境中仍应实现优雅的速率控制。

import time
from functools import wraps

def rate_limit(max_per_minute: int = 100):
    """速率限制装饰器"""
    min_interval = 60.0 / max_per_minute
    last_call_time = {}
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = func.__name__
            now = time.time()
            
            if key in last_call_time:
                elapsed = now - last_call_time[key]
                if elapsed < min_interval:
                    time.sleep(min_interval - elapsed)
            
            last_call_time[key] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

3. 数据质量校验

采集数据的质量直接影响下游分析的准确性。建议在入库前执行以下校验:

def validate_product_data(product: dict) -> tuple[bool, list]:
    """校验商品数据质量"""
    errors = []
    
    if not product.get('asin'):
        errors.append("ASIN缺失")
    
    if not product.get('title') or len(product['title']) < 5:
        errors.append("标题异常")
    
    price = product.get('price', {}).get('current')
    if price is None or price <= 0:
        errors.append("价格异常")
    
    rating = product.get('rating', {}).get('average')
    if rating and (rating < 1 or rating > 5):
        errors.append("评分超出范围")
    
    return len(errors) == 0, errors

# 在数据入库前使用
for product in results.get("products", []):
    is_valid, errors = validate_product_data(product)
    if not is_valid:
        log_warning(f"数据质量异常: {product.get('asin')} - {errors}")
        continue
    save_to_db(product)

4. 监控与告警

建立完善的监控体系,及时发现采集异常:

def monitor_api_health():
    """API健康状态检查"""
    try:
        start = time.time()
        response = requests.post(
            "https://api.pangolinfo.com/v1/amazon/product",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={"site": "amazon.com", "asins": ["B08N5WRWNW"]},
            timeout=30
        )
        latency = time.time() - start
        
        metrics = {
            "status": response.status_code,
            "latency_ms": latency * 1000,
            "success": response.status_code == 200,
            "timestamp": datetime.now().isoformat()
        }
        
        # 推送监控指标到 Prometheus/Datadog
        push_metrics(metrics)
        
        if latency > 20:
            send_alert(f"API延迟过高: {latency:.2f}s")
        
        if response.status_code != 200:
            send_alert(f"API异常: HTTP {response.status_code}")
            
    except Exception as e:
        send_alert(f"API健康检查失败: {str(e)}")

5. 成本控制策略

策略具体做法预期节省
增量更新只采集变更字段,非全量刷新30-50%
智能调度根据站点时区在低峰期执行批量任务10-20%
本地缓存热点数据缓存5-30分钟20-40%
异步优先大批量任务走异步,降低失败重试成本15-25%
数据复用同一批次数据供多个业务模块使用25-35%

总结:从数据接入到业务价值

本文系统介绍了 Pangolinfo API 在亚马逊数据采集场景下的完整使用方案。从同步 REST API 的实时调用,到异步任务的批量处理;从商品详情、关键词搜索、榜单监控、评论采集到广告位分析;从直连数据库的简单集成,到消息队列中间件的解耦架构,再到 AI Agent 的智能化应用——每个环节都提供了经过生产环境验证的代码示例和最佳实践。

电商数据采集的本质不是技术炫技,而是为业务决策提供及时、准确、完整的数据支撑。选择合适的技术方案,建立可靠的数据管道,让团队将精力集中在洞察提炼和策略制定上,这才是数据驱动运营的真正含义。

无论你是正在搭建选品系统的独立开发者,还是负责企业数据平台的技术负责人,亦或是探索 AI Agent 应用的先行者,Pangolinfo API 都提供了与之匹配的能力层级和集成路径。从单个接口调用开始,逐步构建你的数据基础设施,最终形成企业级的竞争优势。

立即开始构建你的亚马逊数据能力:访问 Pangolinfo Scrape API 获取 API Key 和完整文档,或了解 Amazon Scraper Skill 如何让你的 AI Agent 直接获得电商数据采集能力。详细调用请阅读 Amazon Scrape API 文档

常见问题解答(FAQ)

Pangolinfo API 支持哪些亚马逊数据采集场景?

Pangolinfo API 支持亚马逊关键词搜索、热卖榜单(Best Sellers)、新品榜(New Releases)、商品详情、评论采集、SP广告位、类目节点、AI Overview SERP 等多种数据采集场景,覆盖亚马逊全球15个主要站点。

同步接入和异步接入有什么区别?

同步接入通过 REST API 直接返回结果,适合单次、实时性要求高的请求,延迟通常在3-15秒。异步接入通过提交任务后由 Webhook 回调返回结果,适合大规模批量采集,单次可提交数千个请求,由后台分布式集群处理。

如何将 Pangolinfo API 接入企业 ERP 系统?

可通过三种方式集成:1)直接调用 REST API 将数据写入 ERP 数据库;

2)通过中间件服务定期同步数据到 ERP;

3)使用 Pangolinfo 提供的 webhook 推送机制,实时将采集结果推送到 ERP 的接收端点。

Pangolinfo API 的返回数据格式是什么?

默认返回结构化 JSON 格式,包含完整的商品字段(标题、价格、评分、评论数、BSR等)、榜单排名信息、广告位数据、评论内容等。同时支持原始 HTML 和 Markdown 格式输出,满足不同解析需求。

如何通过 Pangolinfo Amazon Scraper Skill 让 AI Agent 采集数据?

Pangolinfo Amazon Scraper Skill 基于 MCP 协议,AI Agent 可直接调用数据采能能力。在 Agent 配置中添加 Skill 端点,Agent 即可通过自然语言指令触发亚马逊数据采集,如”获取iPhone充电器关键词排名前50的商品数据”。

微信扫一扫
与我们联系

QR Code
快速测试

联系我们,您的问题,我们随时倾听

无论您在使用 Pangolin 产品的过程中遇到任何问题,或有任何需求与建议,我们都在这里为您提供支持。请填写以下信息,我们的团队将尽快与您联系,确保您获得最佳的产品体验。

Talk to our team

If you encounter any issues while using Pangolin products, please fill out the following information, and our team will contact you as soon as possible to ensure you have the best product experience.