当你第一次尝试从亚马逊采集商品数据时,可能会遇到这样的困境:花费数小时编写爬虫代码,调试CSS选择器和XPath表达式,好不容易跑通了脚本,第二天却发现亚马逊更新了页面结构,所有代码都需要重写。更令人头疼的是,即使代码能正常运行,也经常因为反爬虫机制被封IP,或者因为请求频率过高导致账号被限制。这种”今天能用,明天就坏”的状态,让很多开发者在数据采集项目上投入了大量时间却收效甚微,项目进度一拖再拖,技术债务越积越多。
问题的根源在于,传统的网页爬虫开发需要开发者同时处理太多底层细节:HTTP请求的构建、Cookie和Session管理、反爬虫对抗、页面解析、数据清洗、异常处理、并发控制等等。一个看似简单的”获取商品价格”需求,实际代码可能超过500行,而且这些代码的大部分都在处理与业务逻辑无关的技术问题。更关键的是,这种方式的维护成本极高——每当目标网站更新页面结构,你就需要重新分析DOM树、调整选择器、测试验证,整个流程可能需要几天甚至一周时间。
Pangolin API的出现彻底改变了这个局面。通过提供稳定的RESTful接口和结构化的JSON数据返回,它将复杂的数据采集工作简化为几行Python代码的API调用。你不需要关心亚马逊的页面结构如何变化,不需要处理反爬虫机制,不需要维护代理IP池,只需要专注于你的业务逻辑——获取数据、分析数据、应用数据。本文将通过7个完整的代码示例,从环境配置到实战项目,手把手教你掌握Python调用Pangolin API的全部技能,让你在30分钟内就能构建出一个可用的数据采集系统。
第一步:Python环境配置与依赖安装
在开始编写代码之前,我们需要搭建一个合适的Python开发环境。Pangolin API对Python版本的要求非常宽松,支持Python 3.7及以上的所有版本,但我强烈建议使用Python 3.9或3.10,因为这两个版本在性能和稳定性上都有显著提升,而且与主流的数据分析库兼容性最好。如果你的系统中还没有安装Python,可以从官网下载对应操作系统的安装包,Windows用户建议勾选”Add Python to PATH”选项,这样可以在命令行中直接使用python命令。
环境准备好后,我们需要安装必要的Python库。Pangolin API本身不需要专门的SDK,只需要使用Python标准库中的requests模块即可完成所有操作,这大大降低了项目的依赖复杂度。打开命令行工具,执行以下命令安装依赖包:
# 安装核心依赖
pip install requests
# 安装数据处理库(可选,用于高级数据分析)
pip install pandas numpy
# 安装JSON处理增强库(可选,用于美化输出)
pip install json5
安装完成后,我们可以创建一个项目目录来组织代码。建议的项目结构如下:
pangolin-api-project/
├── config.py # 配置文件(存储API密钥等)
├── api_client.py # API客户端封装
├── examples/ # 示例代码目录
│ ├── basic_call.py # 基础调用示例
│ ├── batch_fetch.py # 批量采集示例
│ └── monitor.py # 监控项目示例
├── utils/ # 工具函数
│ ├── error_handler.py # 错误处理
│ └── data_parser.py # 数据解析
└── data/ # 数据存储目录
├── raw/ # 原始数据
└── processed/ # 处理后的数据
这种结构化的组织方式不仅让代码更易于维护,也为后续的功能扩展预留了空间。接下来,我们在config.py中配置API认证信息。需要注意的是,永远不要将API密钥硬编码在代码中,也不要提交到Git仓库,正确的做法是使用环境变量或配置文件,并将配置文件加入.gitignore。
第二步:API认证与Token获取
Pangolin API使用Bearer Token认证方式,这是一种简单而安全的认证机制。整个认证流程分为两步:首先使用邮箱和密码获取Token,然后在后续的所有API请求中携带这个Token。Token是长期有效的,这意味着你只需要在程序启动时获取一次,就可以在整个会话期间重复使用,无需每次请求都重新认证。
让我们编写第一个完整的Python代码示例,实现API认证功能:
import requests
import json
from typing import Optional
class PangolinAPIClient:
"""Pangolin API客户端封装类"""
def __init__(self, email: str, password: str):
"""
初始化API客户端
Args:
email: 注册邮箱
password: 账户密码
"""
self.base_url = "https://scrapeapi.pangolinfo.com"
self.email = email
self.password = password
self.token: Optional[str] = None
def authenticate(self) -> bool:
"""
执行API认证,获取访问Token
Returns:
bool: 认证是否成功
"""
auth_url = f"{self.base_url}/api/v1/auth"
payload = {
"email": self.email,
"password": self.password
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(
auth_url,
json=payload,
headers=headers,
timeout=10
)
# 检查HTTP状态码
response.raise_for_status()
# 解析响应
result = response.json()
if result.get("code") == 0:
self.token = result.get("data")
print(f"✓ 认证成功!Token: {self.token[:20]}...")
return True
else:
print(f"✗ 认证失败:{result.get('message')}")
return False
except requests.exceptions.Timeout:
print("✗ 请求超时,请检查网络连接")
return False
except requests.exceptions.RequestException as e:
print(f"✗ 请求异常:{str(e)}")
return False
except json.JSONDecodeError:
print("✗ 响应解析失败,返回数据格式错误")
return False
def get_headers(self) -> dict:
"""
获取带认证信息的请求头
Returns:
dict: 包含Authorization的请求头
"""
if not self.token:
raise ValueError("Token未初始化,请先调用authenticate()方法")
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}"
}
# 使用示例
if __name__ == "__main__":
# 从环境变量或配置文件读取凭证
client = PangolinAPIClient(
email="[email protected]",
password="your_password"
)
# 执行认证
if client.authenticate():
print("API客户端初始化完成,可以开始调用数据接口")
else:
print("认证失败,请检查邮箱和密码是否正确")
这段代码展示了几个重要的最佳实践:使用类封装提高代码复用性、完善的异常处理机制、清晰的日志输出、类型注解增强代码可读性。特别值得注意的是timeout参数的设置,这可以防止网络问题导致程序长时间挂起。在生产环境中,建议将Token缓存到文件或数据库中,避免频繁调用认证接口。
第三步:基础API调用 – 获取商品详情
完成认证后,我们就可以开始调用Pangolin的核心功能了。最常用的场景是获取亚马逊商品的详细信息,包括标题、价格、评分、评论数、库存状态等数十个字段。Pangolin API支持三种数据返回格式:json(结构化数据)、rawHtml(原始HTML)和markdown(Markdown格式),对于大多数应用场景,json格式是最佳选择,因为它已经完成了数据清洗和结构化处理,可以直接用于业务逻辑。
让我们编写一个完整的商品详情获取函数:
def fetch_product_detail(self, asin: str, zipcode: str = "10041") -> dict:
"""
获取亚马逊商品详情
Args:
asin: 商品ASIN码
zipcode: 邮编(用于获取特定地区的价格和库存)
Returns:
dict: 商品详情数据
"""
scrape_url = f"{self.base_url}/api/v1/scrape"
payload = {
"url": f"https://www.amazon.com/dp/{asin}",
"parserName": "amzProductDetail",
"format": "json",
"bizContext": {
"zipcode": zipcode
}
}
try:
response = requests.post(
scrape_url,
json=payload,
headers=self.get_headers(),
timeout=30 # 商品详情页数据较多,适当延长超时时间
)
response.raise_for_status()
result = response.json()
if result.get("code") == 0:
# 提取核心数据
data = result.get("data", {})
json_data = data.get("json", [{}])[0]
if json_data.get("code") == 0:
product_info = json_data.get("data", {}).get("results", [{}])[0]
# 提取关键字段
simplified_data = {
"asin": product_info.get("asin"),
"title": product_info.get("title"),
"price": product_info.get("price"),
"rating": product_info.get("star"),
"review_count": product_info.get("rating"),
"brand": product_info.get("brand"),
"image": product_info.get("image"),
"in_stock": product_info.get("has_cart"),
"category": product_info.get("category_name")
}
print(f"✓ 成功获取商品 {asin} 的详情")
return simplified_data
else:
print(f"✗ 数据解析失败:{json_data.get('message')}")
return {}
else:
error_msg = result.get("message")
if result.get("code") == 2001:
print("✗ 积点余额不足,请充值后继续使用")
elif result.get("code") == 1004:
print("✗ Token无效,请重新认证")
else:
print(f"✗ API调用失败:{error_msg}")
return {}
except requests.exceptions.Timeout:
print(f"✗ 请求超时:商品 {asin} 数据获取失败")
return {}
except Exception as e:
print(f"✗ 未知错误:{str(e)}")
return {}
# 使用示例
if __name__ == "__main__":
client = PangolinAPIClient("[email protected]", "your_password")
if client.authenticate():
# 获取单个商品详情
product = client.fetch_product_detail("B0DYTF8L2W")
if product:
print("\n商品信息:")
print(f"标题:{product['title']}")
print(f"价格:{product['price']}")
print(f"评分:{product['rating']} ({product['review_count']}条评论)")
print(f"品牌:{product['brand']}")
print(f"库存:{'有货' if product['in_stock'] else '缺货'}")
这个示例展示了几个关键技术点:首先是parserName参数的使用,它告诉API使用哪个解析模板来提取数据,Pangolin支持多种解析器包括商品详情、关键词搜索、榜单等;其次是bizContext的应用,通过指定邮编可以获取特定地区的价格和库存信息,这对于跨区域价格对比非常有用;最后是错误码的处理,不同的错误码代表不同的问题,需要针对性地处理。
第四步:批量数据采集与并发优化
在实际应用中,我们往往需要采集大量商品的数据,比如监控一个类目下的所有竞品,或者追踪自己店铺的全部SKU。如果使用串行方式逐个调用API,效率会非常低下——假设每个请求耗时10秒,采集1000个商品就需要近3小时。通过并发处理,我们可以将时间缩短到10-15分钟,效率提升10倍以上。
Python提供了多种并发方案,对于IO密集型的API调用场景,使用concurrent.futures模块的ThreadPoolExecutor是最佳选择。它提供了简洁的接口,自动管理线程池,并且支持异常处理和结果收集。让我们实现一个批量采集函数:
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
import time
def batch_fetch_products(self, asin_list: List[str], max_workers: int = 5) -> List[dict]:
"""
批量获取商品详情(并发版本)
Args:
asin_list: ASIN列表
max_workers: 最大并发线程数(建议5-10)
Returns:
List[dict]: 商品详情列表
"""
results = []
failed_asins = []
print(f"开始批量采集 {len(asin_list)} 个商品...")
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_asin = {
executor.submit(self.fetch_product_detail, asin): asin
for asin in asin_list
}
# 收集结果
for future in as_completed(future_to_asin):
asin = future_to_asin[future]
try:
product_data = future.result()
if product_data:
results.append(product_data)
else:
failed_asins.append(asin)
except Exception as e:
print(f"✗ 处理 {asin} 时发生异常:{str(e)}")
failed_asins.append(asin)
elapsed_time = time.time() - start_time
print(f"\n批量采集完成!")
print(f"成功:{len(results)} 个")
print(f"失败:{len(failed_asins)} 个")
print(f"耗时:{elapsed_time:.2f} 秒")
print(f"平均速度:{len(asin_list)/elapsed_time:.2f} 个/秒")
if failed_asins:
print(f"失败的ASIN:{', '.join(failed_asins)}")
return results
# 使用示例
if __name__ == "__main__":
client = PangolinAPIClient("[email protected]", "your_password")
if client.authenticate():
# 准备要采集的ASIN列表
asin_list = [
"B0DYTF8L2W",
"B08N5WRWNW",
"B07ZPKN6YR",
"B08L5VFJ2R",
"B09JQMJHXY"
]
# 批量采集
products = client.batch_fetch_products(asin_list, max_workers=5)
# 保存到CSV文件
if products:
import pandas as pd
df = pd.DataFrame(products)
df.to_csv("products.csv", index=False, encoding="utf-8-sig")
print(f"\n数据已保存到 products.csv")
这段代码的关键在于max_workers参数的设置。虽然理论上线程数越多速度越快,但实际上受限于API服务器的处理能力和你的网络带宽,过多的并发请求可能导致超时或被限流。根据实测,5-10个并发线程是最佳平衡点,既能充分利用并发优势,又不会给服务器造成过大压力。如果你有大量数据需要采集(比如上万个ASIN),建议分批处理,每批1000个左右,批次之间间隔几秒钟。
第五步:完善的错误处理与重试机制
在生产环境中,网络波动、服务器临时故障、数据格式异常等问题不可避免,一个健壮的系统必须具备完善的错误处理和自动重试能力。Pangolin API返回的错误码非常详细,包括积点不足(2001)、Token无效(1004)、账户过期(2007)、爬取失败(10000/10001)等,我们需要针对不同的错误类型采取不同的处理策略。
让我们实现一个带有指数退避重试机制的API调用装饰器:
import functools
import time
from typing import Callable, Any
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
"""
带指数退避的重试装饰器
Args:
max_retries: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
exponential_base: 指数基数
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
retries = 0
while retries <= max_retries:
try:
return func(*args, **kwargs)
except requests.exceptions.Timeout:
retries += 1
if retries > max_retries:
print(f"✗ 超过最大重试次数 ({max_retries}),放弃请求")
raise
# 计算延迟时间(指数退避)
delay = min(
base_delay * (exponential_base ** (retries - 1)),
max_delay
)
print(f"⟳ 请求超时,{delay:.1f}秒后进行第{retries}次重试...")
time.sleep(delay)
except requests.exceptions.RequestException as e:
# 对于非超时的网络错误,不重试
print(f"✗ 网络错误:{str(e)}")
raise
return wrapper
return decorator
class PangolinAPIClientWithRetry(PangolinAPIClient):
"""带重试机制的API客户端"""
@retry_with_backoff(max_retries=3, base_delay=2.0)
def fetch_product_detail_with_retry(self, asin: str, zipcode: str = "10041") -> dict:
"""
获取商品详情(带重试)
这个方法会在遇到超时错误时自动重试,
重试间隔采用指数退避策略:2秒、4秒、8秒
"""
return self.fetch_product_detail(asin, zipcode)
def handle_api_error(self, error_code: int, error_message: str) -> str:
"""
统一的错误处理函数
Args:
error_code: API返回的错误码
error_message: 错误信息
Returns:
str: 用户友好的错误提示
"""
error_handlers = {
0: "操作成功",
1004: "Token无效,请重新登录认证",
2001: "积点余额不足,请前往控制台充值",
2007: "账户已过期,请联系客服续费",
10000: "数据爬取失败,请稍后重试",
10001: "数据爬取失败,目标页面可能不存在",
404: "URL地址错误,请检查链接格式"
}
user_message = error_handlers.get(
error_code,
f"未知错误 (代码: {error_code})"
)
# 记录详细错误日志
self.log_error(error_code, error_message)
return user_message
def log_error(self, error_code: int, error_message: str):
"""记录错误日志到文件"""
import datetime
log_entry = {
"timestamp": datetime.datetime.now().isoformat(),
"error_code": error_code,
"error_message": error_message
}
# 这里可以扩展为写入日志文件或发送告警
print(f"[ERROR LOG] {log_entry}")
指数退避策略的核心思想是:第一次重试等待时间较短,如果仍然失败,则逐次增加等待时间,这样既能快速恢复临时性故障,又能避免在服务器压力大时雪上加霜。在实际使用中,你可以根据业务特点调整重试参数,比如对于实时性要求高的场景可以减少重试次数,对于批量后台任务可以增加重试次数和延迟时间。
实战项目一:Best Seller榜单实时监控系统
让我们将前面学到的知识整合起来,构建一个真实的应用场景:监控亚马逊Best Seller榜单的变化。这个系统可以帮助卖家及时发现热门产品趋势,分析竞品动态,为选品和运营决策提供数据支持。我们将实现以下功能:定时采集榜单数据、对比排名变化、识别新上榜产品、生成变化报告。
import json
import time
from datetime import datetime
from typing import List, Dict
class BestSellerMonitor:
"""Best Seller榜单监控器"""
def __init__(self, api_client: PangolinAPIClient):
self.client = api_client
self.history_file = "data/bestseller_history.json"
self.load_history()
def load_history(self):
"""加载历史数据"""
try:
with open(self.history_file, 'r', encoding='utf-8') as f:
self.history = json.load(f)
except FileNotFoundError:
self.history = {}
def save_history(self):
"""保存历史数据"""
with open(self.history_file, 'w', encoding='utf-8') as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
def fetch_bestseller_list(self, category_url: str) -> List[Dict]:
"""
获取Best Seller榜单
Args:
category_url: 类目URL
Returns:
List[Dict]: 榜单商品列表
"""
scrape_url = f"{self.client.base_url}/api/v1/scrape"
payload = {
"url": category_url,
"parserName": "amzBestSellers",
"format": "json",
"bizContext": {
"zipcode": "10041"
}
}
try:
response = requests.post(
scrape_url,
json=payload,
headers=self.client.get_headers(),
timeout=30
)
response.raise_for_status()
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
json_data = data.get("json", [{}])[0]
if json_data.get("code") == 0:
products = json_data.get("data", {}).get("results", [])
print(f"✓ 成功获取 {len(products)} 个榜单商品")
return products
return []
except Exception as e:
print(f"✗ 获取榜单失败:{str(e)}")
return []
def compare_rankings(self, current_data: List[Dict], previous_data: List[Dict]) -> Dict:
"""
对比排名变化
Args:
current_data: 当前榜单数据
previous_data: 历史榜单数据
Returns:
Dict: 变化分析结果
"""
changes = {
"new_entries": [], # 新上榜
"rank_up": [], # 排名上升
"rank_down": [], # 排名下降
"dropped_out": [] # 跌出榜单
}
# 构建ASIN到排名的映射
current_ranks = {item['asin']: idx + 1 for idx, item in enumerate(current_data)}
previous_ranks = {item['asin']: idx + 1 for idx, item in enumerate(previous_data)}
# 识别新上榜商品
for asin in current_ranks:
if asin not in previous_ranks:
product = next(p for p in current_data if p['asin'] == asin)
changes["new_entries"].append({
"asin": asin,
"title": product.get("title"),
"rank": current_ranks[asin]
})
# 识别排名变化
for asin in current_ranks:
if asin in previous_ranks:
rank_change = previous_ranks[asin] - current_ranks[asin]
if rank_change > 0: # 排名上升
product = next(p for p in current_data if p['asin'] == asin)
changes["rank_up"].append({
"asin": asin,
"title": product.get("title"),
"from_rank": previous_ranks[asin],
"to_rank": current_ranks[asin],
"change": rank_change
})
elif rank_change < 0: # 排名下降
product = next(p for p in current_data if p['asin'] == asin)
changes["rank_down"].append({
"asin": asin,
"title": product.get("title"),
"from_rank": previous_ranks[asin],
"to_rank": current_ranks[asin],
"change": abs(rank_change)
})
# 识别跌出榜单的商品
for asin in previous_ranks:
if asin not in current_ranks:
product = next(p for p in previous_data if p['asin'] == asin)
changes["dropped_out"].append({
"asin": asin,
"title": product.get("title"),
"previous_rank": previous_ranks[asin]
})
return changes
def generate_report(self, changes: Dict) -> str:
"""生成变化报告"""
report = []
report.append("=" * 60)
report.append(f"Best Seller榜单变化报告")
report.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 60)
if changes["new_entries"]:
report.append(f"\n🆕 新上榜商品 ({len(changes['new_entries'])}个):")
for item in changes["new_entries"][:5]: # 只显示前5个
report.append(f" #{item['rank']} - {item['title'][:50]}...")
if changes["rank_up"]:
report.append(f"\n📈 排名上升 ({len(changes['rank_up'])}个):")
for item in sorted(changes["rank_up"], key=lambda x: x['change'], reverse=True)[:5]:
report.append(f" {item['title'][:50]}...")
report.append(f" {item['from_rank']} → {item['to_rank']} (↑{item['change']})")
if changes["rank_down"]:
report.append(f"\n📉 排名下降 ({len(changes['rank_down'])}个):")
for item in sorted(changes["rank_down"], key=lambda x: x['change'], reverse=True)[:5]:
report.append(f" {item['title'][:50]}...")
report.append(f" {item['from_rank']} → {item['to_rank']} (↓{item['change']})")
if changes["dropped_out"]:
report.append(f"\n❌ 跌出榜单 ({len(changes['dropped_out'])}个):")
for item in changes["dropped_out"][:5]:
report.append(f" {item['title'][:50]}... (原排名#{item['previous_rank']})")
return "\n".join(report)
def run_monitor(self, category_url: str, interval_minutes: int = 60):
"""
运行监控任务
Args:
category_url: 要监控的类目URL
interval_minutes: 检查间隔(分钟)
"""
print(f"开始监控Best Seller榜单...")
print(f"检查间隔:{interval_minutes}分钟")
while True:
try:
# 获取当前榜单
current_data = self.fetch_bestseller_list(category_url)
if current_data:
# 获取历史数据
category_key = category_url.split('/')[-1]
previous_data = self.history.get(category_key, [])
if previous_data:
# 对比变化
changes = self.compare_rankings(current_data, previous_data)
# 生成报告
report = self.generate_report(changes)
print(report)
# 保存报告到文件
with open(f"data/report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", 'w', encoding='utf-8') as f:
f.write(report)
# 更新历史数据
self.history[category_key] = current_data
self.save_history()
# 等待下次检查
print(f"\n下次检查时间:{(datetime.now() + timedelta(minutes=interval_minutes)).strftime('%H:%M:%S')}")
time.sleep(interval_minutes * 60)
except KeyboardInterrupt:
print("\n监控已停止")
break
except Exception as e:
print(f"✗ 监控过程发生错误:{str(e)}")
time.sleep(60) # 出错后等待1分钟再继续
# 使用示例
if __name__ == "__main__":
client = PangolinAPIClient("[email protected]", "your_password")
if client.authenticate():
monitor = BestSellerMonitor(client)
# 监控电子产品类目的Best Seller榜单
category_url = "https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics"
monitor.run_monitor(category_url, interval_minutes=60)
这个监控系统展示了如何将API调用、数据处理、变化检测和报告生成整合成一个完整的应用。在实际使用中,你可以进一步扩展功能,比如添加邮件或微信通知、将数据存储到数据库、生成可视化图表等。关键是要保持代码的模块化和可扩展性,这样后续添加新功能时不需要大规模重构。
实战项目二:竞品价格追踪与预警系统
价格是电商竞争的核心要素之一,及时掌握竞品的价格变化对于制定定价策略至关重要。我们将构建一个自动化的价格追踪系统,定期采集竞品价格,检测异常波动,并在价格变化超过阈值时发送预警。这个系统可以帮助卖家快速响应市场变化,避免因价格劣势导致的销量下滑。
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class PriceTracker:
"""竞品价格追踪器"""
def __init__(self, api_client: PangolinAPIClient):
self.client = api_client
self.price_history_file = "data/price_history.csv"
self.load_price_history()
def load_price_history(self):
"""加载价格历史数据"""
try:
self.price_df = pd.read_csv(self.price_history_file)
self.price_df['timestamp'] = pd.to_datetime(self.price_df['timestamp'])
except FileNotFoundError:
self.price_df = pd.DataFrame(columns=[
'timestamp', 'asin', 'title', 'price', 'currency'
])
def save_price_history(self):
"""保存价格历史"""
self.price_df.to_csv(self.price_history_file, index=False)
def track_price(self, asin: str) -> Optional[Dict]:
"""
追踪单个商品价格
Args:
asin: 商品ASIN
Returns:
Optional[Dict]: 价格信息
"""
product = self.client.fetch_product_detail(asin)
if product and product.get('price'):
price_str = product['price']
# 解析价格(处理$符号和逗号)
try:
price_value = float(price_str.replace('$', '').replace(',', ''))
price_record = {
'timestamp': datetime.now(),
'asin': asin,
'title': product.get('title', ''),
'price': price_value,
'currency': 'USD'
}
# 添加到历史记录
self.price_df = pd.concat([
self.price_df,
pd.DataFrame([price_record])
], ignore_index=True)
return price_record
except ValueError:
print(f"✗ 价格解析失败:{price_str}")
return None
return None
def batch_track_prices(self, asin_list: List[str]) -> List[Dict]:
"""批量追踪价格"""
results = []
for asin in asin_list:
price_record = self.track_price(asin)
if price_record:
results.append(price_record)
time.sleep(1) # 避免请求过快
self.save_price_history()
return results
def detect_price_changes(self, asin: str, threshold_percent: float = 5.0) -> Dict:
"""
检测价格变化
Args:
asin: 商品ASIN
threshold_percent: 变化阈值(百分比)
Returns:
Dict: 变化分析结果
"""
# 获取该商品的历史价格
product_prices = self.price_df[self.price_df['asin'] == asin].sort_values('timestamp')
if len(product_prices) < 2:
return {"status": "insufficient_data"}
latest_price = product_prices.iloc[-1]['price']
previous_price = product_prices.iloc[-2]['price']
price_change = latest_price - previous_price
price_change_percent = (price_change / previous_price) * 100
result = {
"asin": asin,
"title": product_prices.iloc[-1]['title'],
"current_price": latest_price,
"previous_price": previous_price,
"change_amount": price_change,
"change_percent": price_change_percent,
"status": "normal"
}
if abs(price_change_percent) >= threshold_percent:
result["status"] = "alert"
result["alert_type"] = "price_increase" if price_change > 0 else "price_decrease"
return result
def generate_price_report(self, asin_list: List[str]) -> str:
"""生成价格报告"""
report = []
report.append("=" * 60)
report.append("竞品价格追踪报告")
report.append(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 60)
for asin in asin_list:
change_info = self.detect_price_changes(asin, threshold_percent=5.0)
if change_info.get("status") == "insufficient_data":
continue
report.append(f"\n商品:{change_info['title'][:50]}...")
report.append(f"ASIN:{asin}")
report.append(f"当前价格:${change_info['current_price']:.2f}")
report.append(f"上次价格:${change_info['previous_price']:.2f}")
if change_info['status'] == "alert":
symbol = "📈" if change_info['alert_type'] == "price_increase" else "📉"
report.append(f"{symbol} 价格变化:{change_info['change_percent']:+.2f}% (${change_info['change_amount']:+.2f})")
report.append("⚠️ 已触发价格预警!")
else:
report.append(f"价格变化:{change_info['change_percent']:+.2f}% (${change_info['change_amount']:+.2f})")
return "\n".join(report)
def get_price_trend(self, asin: str, days: int = 30) -> pd.DataFrame:
"""
获取价格趋势数据
Args:
asin: 商品ASIN
days: 天数
Returns:
pd.DataFrame: 价格趋势数据
"""
cutoff_date = datetime.now() - timedelta(days=days)
trend_data = self.price_df[
(self.price_df['asin'] == asin) &
(self.price_df['timestamp'] >= cutoff_date)
].sort_values('timestamp')
return trend_data
# 使用示例
if __name__ == "__main__":
client = PangolinAPIClient("[email protected]", "your_password")
if client.authenticate():
tracker = PriceTracker(client)
# 定义要追踪的竞品ASIN列表
competitor_asins = [
"B08N5WRWNW",
"B07ZPKN6YR",
"B08L5VFJ2R"
]
# 批量追踪价格
print("开始追踪竞品价格...")
tracker.batch_track_prices(competitor_asins)
# 生成报告
report = tracker.generate_price_report(competitor_asins)
print(report)
# 保存报告
with open(f"data/price_report_{datetime.now().strftime('%Y%m%d')}.txt", 'w', encoding='utf-8') as f:
f.write(report)
这个价格追踪系统的核心价值在于自动化和及时性。通过定时任务(可以使用cron或Windows任务计划程序)每天运行一次,你就能建立起完整的价格历史数据库,为定价决策提供数据支撑。进一步地,你可以结合机器学习算法预测价格趋势,或者根据竞品价格自动调整自己的定价策略,实现真正的智能化运营。
总结与最佳实践建议
通过本文的7个完整代码示例,我们系统地学习了Python调用Pangolin API的全部技能,从基础的环境配置、API认证,到高级的并发处理、错误重试,再到实战的监控系统和追踪系统。这些知识不仅适用于Pangolin API,也是开发任何API集成项目的通用方法论。
在实际应用中,有几个关键的最佳实践需要特别注意:首先是安全性,永远不要将API密钥硬编码在代码中,使用环境变量或加密的配置文件;其次是稳定性,完善的错误处理和重试机制是生产环境的必备要素;第三是性能,合理使用并发和缓存可以大幅提升系统效率;最后是可维护性,模块化的代码结构和清晰的文档注释能让项目长期健康发展。
如果你现在就想开始实践,建议按照以下步骤进行:第一,访问Pangolinfo官网注册账号并获取API密钥;第二,使用本文提供的代码示例搭建基础框架;第三,根据你的实际需求定制功能模块;第四,在小规模测试环境中验证系统稳定性;第五,逐步扩展到生产环境。记住,任何复杂的系统都是从简单的MVP开始迭代而来的,不要试图一次性实现所有功能,而是要持续优化和改进。
更多技术细节和API参数说明,请参考Pangolinfo API官方文档。如果在开发过程中遇到问题,可以访问开发者控制台查看API调用日志和积点使用情况。祝你开发顺利,用数据驱动业务增长!
立即开始Python API开发之旅 → 访问 Pangolinfo Scrape API 获取免费试用额度,或查看 完整API文档 了解更多技术细节。让Python代码为你的数据采集赋能!
