本文聚焦于亚马逊广告监控的工程实现:如何通过 Open Claw
亚马逊广告监控 Open Claw Pangolinfo SERP API 实时 SP 广告位追踪示意图

先说结论:亚马逊广告监控的核心价值和实现路径

亚马逊广告监控不是一个新话题,但真正能做到实时、准确、可告警的监控系统,大多数卖家团队都没有建立起来。根本原因不在于技术难度,而在于工具架构的错配——手动截图+Excel 记录这条路,对监控超过 20 个关键词的团队来说,时间成本高到难以持续。

用 Open Claw + Pangolinfo API 构建的亚马逊广告监控系统,可以实现:每 2-4 小时自动扫描核心关键词的 SERP 广告位,检测新竞品入场、占位者变化以及广告价格区间变动,当触发预设阈值时立即推送 Slack 或企业微信告警。这套方案的核心数据能力来自 Pangolinfo SERP API(SP 广告位 98% 采集率,业界领先),加上 Open Claw 的自然语言接口和工具调用机制,让整个监控逻辑可以用 Python 代码在几百行内实现,并且完全可定制。

本文直接从实现层面切入。如果你还没有完成 Open Claw 的基础部署和 Pangolinfo API Key 的申请获取,可以参考前序文章《Open Claw 在跨境电商的应用场景》和《Open Claw + Pangolinfo API 接入实战》——本文假设这两步已完成,专注于广告监控这个具体场景的工程实现。

亚马逊广告监控的核心数据维度:监控什么,为什么监控

在写任何代码之前,先把”监控什么”想清楚。亚马逊广告监控不等于监控广告花费——你自己账户的投放数据在广告后台都能看到。这里说的监控,指的是竞争维度的监控:搜索结果页上,你的核心关键词被哪些竞品占据了哪些位置,这个格局每天、每小时在怎么变化。

亚马逊搜索结果页的 SP(Sponsored Products)广告位,通常分为顶部(Top of Search)、中部和底部。顶部广告位的点击率是其他位置的 3-5 倍,因此竞品在顶部的出现频率,直接反映了它们的广告预算优先级和竞争激烈程度。当一个新的竞品开始频繁占据你核心关键词的顶部第一位,往往意味着:它在测试大幅提升出价、可能即将进行大促、或者在通过广告拉动 BSR 快速积累销量。这三种情况都需要你的团队做出响应决策。

因此,一个完整的亚马逊广告监控系统需要追踪的核心维度包括:某关键词在特定时间点的广告位 Top 3 分别是哪些 ASIN,它们各自出现了多少次(在多次抓取中的出现频率),它们的价格区间是多少,以及和上一个时间点相比,有哪些 ASIN 新出现、哪些不见了。这四条维度,配合适当的历史数据积累,就能构建出一套有实际价值的竞品广告情报系统。

关键词监控清单的设计原则

关键词清单是整个监控系统的”燃料”,它的质量直接决定监控系统的信噪比。通常来说,一个中型卖家团队维护 20-50 个核心监控关键词是合理区间:覆盖自家产品的主词和 2-3 个强竞争词,品类内的类目关键词,以及正在测试的新词。超过 100 个关键词的监控清单,如果没有按优先级分层,很快会演变成信息噪音——所有词同等频率监控、同等权重告警,结果是每天产生大量价值密度低的通知,团队开始忽视这些通知,系统实际上等于失效。

建议的分层结构:A 类词(核心品词,每 2 小时一次高频监控,变化即告警)、B 类词(竞争类目词,每 6 小时一次,显著变化才告警)、C 类词(探索性词,每 24 小时一次,写入数据库但不主动告警)。这种分层既控制 API 调用成本,也保证了团队注意力集中在真正重要的信号上。

用 Pangolinfo SERP API 采集实时 SP 广告位数据

Pangolinfo 的 SERP Data API 是亚马逊广告监控数据层的核心,支持对指定关键词在亚马逊搜索结果页进行结构化抓取,返回包含广告位排名、ASIN、标题、品牌、价格、评分在内的完整数据包。其 SP 广告位 98% 的采集率(业界第一),确保了监控系统不会因为数据缺失产生误判——这一点在竞争激烈的头部类目尤为重要,因为竞品广告位的出现与否,有时就是告警触发与否的边界。

以下是封装 Pangolinfo SERP API 调用的核心数据采集模块。注意异步实现(asyncio + aiohttp)在批量关键词监控场景中几乎是必选——对于 50 个关键词的同步顺序请求,完成一轮需要 5-10 分钟;异步并发后,同样数量通常在 30-60 秒内完成。


# 亚马逊广告监控 - 数据采集模块
# 基于 Pangolinfo SERP API 实现实时 SP 广告位数据抓取

import asyncio
import aiohttp
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional, Dict

logger = logging.getLogger(__name__)

PANGOLINFO_API_KEY = "your_pangolinfo_api_key"
SERP_API_ENDPOINT = "https://api.pangolinfo.com/v1/serp"


@dataclass
class AdPosition:
    """单条广告位数据结构"""
    rank: int                         # 广告位排名(1, 2, 3...)
    position_type: str                # 'top' / 'middle' / 'bottom'
    asin: str
    title: str
    brand: str
    price: Optional[float]
    rating: Optional[float]
    review_count: Optional[int]
    is_sponsored: bool = True

@dataclass
class SerpSnapshot:
    """一次 SERP 快照的完整数据"""
    keyword: str
    marketplace: str
    captured_at: str
    top_ads: List[AdPosition] = field(default_factory=list)
    total_ad_count: int = 0
    query_success: bool = True
    error_msg: Optional[str] = None


async def fetch_serp_snapshot(
    session: aiohttp.ClientSession,
    keyword: str,
    marketplace: str = "US",
    semaphore: asyncio.Semaphore = None
) -> SerpSnapshot:
    """
    抓取指定关键词的 Amazon SERP 广告位快照

    Args:
        keyword: 监控关键词,如 "wireless earbuds under $30"
        marketplace: 站点代码,US/UK/DE/JP 等
        semaphore: 并发控制信号量
    """
    if semaphore:
        async with semaphore:
            return await _do_fetch(session, keyword, marketplace)
    return await _do_fetch(session, keyword, marketplace)


async def _do_fetch(
    session: aiohttp.ClientSession,
    keyword: str,
    marketplace: str
) -> SerpSnapshot:
    headers = {
        "Authorization": f"Bearer {PANGOLINFO_API_KEY}",
        "Content-Type": "application/json"
    }

    payload = {
        "source": "amazon_search",
        "query": keyword,
        "marketplace": marketplace,
        "page": 1,
        "include_sponsored": True,       # 必须开启,这是广告监控的核心
        "include_organic": False,         # 广告监控场景不需要自然位数据
        "output_format": "json"
    }

    captured_at = datetime.now(timezone.utc).isoformat()

    try:
        async with session.post(
            SERP_API_ENDPOINT,
            headers=headers,
            json=payload,
            timeout=aiohttp.ClientTimeout(total=30)
        ) as resp:
            resp.raise_for_status()
            raw = await resp.json()

        sponsored_items = raw.get("sponsored_results", [])
        ad_positions = []

        for item in sponsored_items[:10]:  # 只取前 10 个广告位
            ad_positions.append(AdPosition(
                rank=item.get("ad_rank", 0),
                position_type=item.get("ad_placement", "unknown"),  # top/middle/bottom
                asin=item.get("asin", ""),
                title=item.get("title", ""),
                brand=item.get("brand", ""),
                price=item.get("price"),
                rating=item.get("rating"),
                review_count=item.get("review_count"),
                is_sponsored=True
            ))

        # 按位置类型排序:顶部优先
        top_ads = sorted(
            [a for a in ad_positions if a.position_type == "top"],
            key=lambda x: x.rank
        )

        return SerpSnapshot(
            keyword=keyword,
            marketplace=marketplace,
            captured_at=captured_at,
            top_ads=top_ads,
            total_ad_count=len(ad_positions),
            query_success=True
        )

    except asyncio.TimeoutError:
        logger.warning(f"SERP fetch timeout: keyword='{keyword}'")
        return SerpSnapshot(keyword=keyword, marketplace=marketplace,
                           captured_at=captured_at,
                           query_success=False, error_msg="timeout")
    except Exception as e:
        logger.error(f"SERP fetch error: keyword='{keyword}', error={e}")
        return SerpSnapshot(keyword=keyword, marketplace=marketplace,
                           captured_at=captured_at,
                           query_success=False, error_msg=str(e))


async def batch_fetch_serp(
    keyword_list: List[str],
    marketplace: str = "US",
    max_concurrent: int = 8
) -> List[SerpSnapshot]:
    """
    批量抓取关键词 SERP,自动并发控制

    Args:
        keyword_list: 关键词列表,建议不超过 100 个
        marketplace: 市场代码
        max_concurrent: 最大并发数,避免触发速率限制
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_serp_snapshot(session, kw, marketplace, semaphore)
            for kw in keyword_list
        ]
        results = await asyncio.gather(*tasks, return_exceptions=False)

    success_count = sum(1 for r in results if r.query_success)
    logger.info(f"SERP 批量采集完成: {success_count}/{len(keyword_list)} 成功")

    return results
    

上面这个采集模块的几个设计决策值得说明。首先,`include_organic: False` 这个参数降低了单次请求的数据量和解析负担——纯广告监控场景不需要自然位数据,但如果你的监控系统同时要追踪关键词的自然排名变化,可以把这个参数改为 True。其次,并发数 `max_concurrent` 默认设置为 8,这是在 Pangolinfo SERP API 的标准速率限制和实际抓取时效之间的经验值;如果你的订阅套餐支持更高并发,可以适当提升。

亚马逊广告监控工作流架构图:Pangolinfo SERP API 采集 SP 广告位数据,Open Claw Agent 完成分析与告警
亚马逊广告监控完整工作流——关键词配置层→Pangolinfo SERP API 实时采集(SP广告位98%覆盖)→Open Claw Agent 变化分析→多渠道告警输出

Open Claw 广告监控工作流:变化检测与智能告警

数据采集是基础,真正体现亚马逊广告监控价值的,是变化检测逻辑和告警策略。一个新的 ASIN 出现在你竞争最激烈的核心词顶部广告位,和它出现在一个长尾词底部广告位,这两件事的业务含义完全不同,但如果告警规则写得不好,都会触发同级别的通知,造成告警疲劳。

Open Claw 在这个环节的价值在于:它不只是调用 API 获取数据那么简单,LLM 推理层可以对采集到的变化做初步解读,用自然语言描述这次变化的可能含义,帮助运营团队更快地判断是否需要响应以及如何响应。比如,当检测到竞品 ASIN B08XXXXXX 从广告位第 5 跃升至第 1,且价格同时降低了 15%,LLM 可以自动生成一段分析:”该竞品可能启动了大促配合广告推量的组合策略,历史数据显示它上次出现类似操作是在 Prime Day 前 48 小时……”——这类上下文信息,是纯规则引擎给不出来的。

比较逻辑:当前快照 vs 历史基准线


# 亚马逊广告监控 - 变化检测与告警模块

from dataclasses import dataclass
from typing import List, Set, Dict, Optional
from enum import Enum

class AlertSeverity(Enum):
    CRITICAL = "CRITICAL"   # 核心词出现全新强度竞品
    HIGH = "HIGH"           # 主要广告位发生显著变化
    MEDIUM = "MEDIUM"       # 次要广告位变化或价格调整
    LOW = "LOW"             # 长尾词变化,记录不告警


@dataclass
class AdAlert:
    keyword: str
    severity: AlertSeverity
    alert_type: str
    message: str
    affected_asin: str
    previous_state: Optional[str] = None
    current_state: Optional[str] = None
    triggered_at: str = ""


class SerpChangeDetector:
    """
    SERP 广告位变化检测器
    对比当前快照与历史基准线,输出结构化告警列表
    """

    def __init__(self, alert_config: Dict):
        """
        alert_config 示例:
        {
            "new_top1_entrant": "CRITICAL",      # 新 ASIN 占据第1位
            "top3_new_entrant": "HIGH",          # 新 ASIN 进入 Top 3
            "top3_disappear": "MEDIUM",          # Top 3 原有 ASIN 消失
            "price_drop_pct": 10,                # 价格降幅超过10%触发
            "keyword_priority": {                # 关键词优先级:影响告警级别
                "A": ["wireless earbuds", "bluetooth speaker"],
                "B": ["earbuds under 30", "tws earbuds 2026"],
                "C": []
            }
        }
        """
        self.config = alert_config
        self.kw_priority = self._build_priority_map(
            alert_config.get("keyword_priority", {})
        )

    def _build_priority_map(self, priority_config: Dict) -> Dict[str, str]:
        result = {}
        for tier, keywords in priority_config.items():
            for kw in keywords:
                result[kw.lower()] = tier
        return result

    def _get_priority_tier(self, keyword: str) -> str:
        return self.kw_priority.get(keyword.lower(), "B")

    def _adjust_severity(self, base_severity: AlertSeverity,
                          keyword: str) -> AlertSeverity:
        """根据关键词优先级调整告警等级"""
        tier = self._get_priority_tier(keyword)
        severity_map = {
            "A": {  # A 类词:升级告警
                AlertSeverity.MEDIUM: AlertSeverity.HIGH,
                AlertSeverity.HIGH: AlertSeverity.CRITICAL,
            },
            "C": {  # C 类词:降级不告警
                AlertSeverity.HIGH: AlertSeverity.LOW,
                AlertSeverity.CRITICAL: AlertSeverity.MEDIUM,
            }
        }
        upgrades = severity_map.get(tier, {})
        return upgrades.get(base_severity, base_severity)

    def compare(self,
                current: "SerpSnapshot",
                baseline: Optional["SerpSnapshot"]) -> List[AdAlert]:
        """
        对比当前快照与基准线,输出告警列表

        baseline 为 None 时(首次运行),只记录不告警
        """
        if baseline is None:
            return []  # 首次运行建立基准线,无告警

        alerts = []
        keyword = current.keyword

        current_top3_asins: Set[str] = {
            a.asin for a in current.top_ads[:3]
        }
        baseline_top3_asins: Set[str] = {
            a.asin for a in baseline.top_ads[:3]
        }
        current_top1 = current.top_ads[0].asin if current.top_ads else None
        baseline_top1 = baseline.top_ads[0].asin if baseline.top_ads else None

        # ── 检测1:顶部第1位发生了变化 ─────────────────────────────
        if current_top1 and current_top1 != baseline_top1:
            is_new = current_top1 not in baseline_top3_asins
            base_sev = AlertSeverity.CRITICAL if is_new else AlertSeverity.HIGH
            severity = self._adjust_severity(base_sev, keyword)
            alerts.append(AdAlert(
                keyword=keyword,
                severity=severity,
                alert_type="top1_position_change",
                message=(
                    f"关键词「{keyword}」广告第1位变化:\n"
                    f"  旧: {baseline_top1 or 'N/A'}\n"
                    f"  新: {current_top1}" +
                    ("(全新竞品,历史未出现过 Top 3)" if is_new else "(位置提升)")
                ),
                affected_asin=current_top1,
                previous_state=baseline_top1,
                current_state=current_top1,
                triggered_at=current.captured_at
            ))

        # ── 检测2:Top 3 中出现全新竞品 ────────────────────────────
        new_entrants = current_top3_asins - baseline_top3_asins - {current_top1}
        for asin in new_entrants:
            rank = next((a.rank for a in current.top_ads if a.asin == asin), -1)
            severity = self._adjust_severity(AlertSeverity.HIGH, keyword)
            alerts.append(AdAlert(
                keyword=keyword,
                severity=severity,
                alert_type="new_top3_entrant",
                message=(
                    f"关键词「{keyword}」新竞品进入 Top 3 广告位(位置#{rank}):{asin}"
                ),
                affected_asin=asin,
                triggered_at=current.captured_at
            ))

        # ── 检测3:Top 3 原有竞品消失 ───────────────────────────────
        disappeared = baseline_top3_asins - current_top3_asins
        for asin in disappeared:
            severity = self._adjust_severity(AlertSeverity.MEDIUM, keyword)
            alerts.append(AdAlert(
                keyword=keyword,
                severity=severity,
                alert_type="top3_disappear",
                message=(
                    f"关键词「{keyword}」竞品 {asin} 退出 Top 3 广告位,可能缩减预算"
                ),
                affected_asin=asin,
                triggered_at=current.captured_at
            ))

        # ── 检测4:价格大幅变化 ──────────────────────────────────────
        price_drop_threshold = self.config.get("price_drop_pct", 10) / 100
        baseline_price_map = {a.asin: a.price for a in baseline.top_ads if a.price}

        for ad in current.top_ads[:3]:
            if not ad.price or ad.asin not in baseline_price_map:
                continue
            old_price = baseline_price_map[ad.asin]
            if old_price > 0:
                drop_pct = (old_price - ad.price) / old_price
                if drop_pct >= price_drop_threshold:
                    severity = self._adjust_severity(AlertSeverity.HIGH, keyword)
                    alerts.append(AdAlert(
                        keyword=keyword,
                        severity=severity,
                        alert_type="significant_price_drop",
                        message=(
                            f"关键词「{keyword}」广告竞品 {ad.asin} 降价 "
                            f"{drop_pct*100:.1f}%:${old_price:.2f} → ${ad.price:.2f}"
                        ),
                        affected_asin=ad.asin,
                        previous_state=f"${old_price:.2f}",
                        current_state=f"${ad.price:.2f}",
                        triggered_at=current.captured_at
                    ))

        return alerts
    

LLM 加持:让告警更有可读性

上面的 `SerpChangeDetector` 输出的是结构化告警数据,可以直接推送 Slack webhook。但对于 CRITICAL 级别的告警,可以把相关数据传给 Open Claw 的 LLM 层做一次自然语言解读,让通知内容包含背景分析而不只是干燥的数据变化记录。


# Open Claw 广告监控 - LLM 告警解读集成

from anthropic import Anthropic

client = Anthropic()

def enrich_alert_with_llm(alert: AdAlert,
                           current_snapshot: "SerpSnapshot",
                           history: List["SerpSnapshot"]) -> str:
    """
    对 HIGH/CRITICAL 告警进行 LLM 语义解读
    返回人类可读的分析摘要
    """
    # 构建历史上下文
    history_summary = "\n".join([
        f"  {snap.captured_at[:16]}: Top3 = {[a.asin for a in snap.top_ads[:3]]}"
        for snap in history[-5:]  # 最近5次快照
    ])

    current_top3_detail = "\n".join([
        f"  #{a.rank} 位: {a.asin} | {a.brand} | ${a.price or 'N/A'} | "
        f"⭐{a.rating or 'N/A'} ({a.review_count or 0}评)"
        for a in current_snapshot.top_ads[:3]
    ])

    prompt = f"""
你是一位亚马逊广告运营专家,请分析以下告警并给出简洁的业务判断。

关键词:{alert.keyword}
告警类型:{alert.alert_type}
告警描述:{alert.message}

当前 Top 3 广告位详情:
{current_top3_detail}

近期历史快照(Top 3 ASIN 变化):
{history_summary}

请用 100 字以内给出:
1. 这次变化最可能的原因(广告策略调整/大促预热/新品冲量等)
2. 建议运营团队需要关注的 1-2 个最重要事项
输出要直接、务实,避免废话。
"""

    response = client.messages.create(
        model="claude-3-7-sonnet-20250219",
        max_tokens=300,
        messages=[{"role": "user", "content": prompt}]
    )

    return response.content[0].text
    

完整监控调度:定时运行与多渠道告警通知

采集和检测逻辑都就位之后,最后一层是调度层——让整个系统按照预设频率自动运行,并把告警推送到团队实际使用的沟通渠道。以下是一个完整的调度器实现,支持关键词按优先级分层执行,并对不同严重度的告警走不同的通知渠道。


# 完整亚马逊广告监控调度器

import asyncio
import json
import sqlite3
import aiohttp
from datetime import datetime, timezone
from typing import List, Dict

class AdMonitorScheduler:
    """
    亚马逊广告监控调度器
    负责按优先级分批执行监控扫描,持久化数据,分发告警
    """

    def __init__(self, config: Dict):
        self.config = config
        self.db_path = config.get("db_path", "ad_monitor.db")
        self._init_db()

        self.scraper = None   # AsyncPangolinScraper 实例(import from 采集模块)
        self.detector = SerpChangeDetector(config.get("alert_config", {}))

    def _init_db(self):
        """初始化 SQLite 数据库(轻量存储,生产环境建议换 PostgreSQL)"""
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS serp_snapshots (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                keyword TEXT NOT NULL,
                marketplace TEXT NOT NULL,
                captured_at TEXT NOT NULL,
                top_ads_json TEXT NOT NULL,
                total_ad_count INTEGER
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS alerts_log (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                keyword TEXT,
                severity TEXT,
                alert_type TEXT,
                message TEXT,
                affected_asin TEXT,
                triggered_at TEXT,
                notified INTEGER DEFAULT 0
            )
        """)
        conn.commit()
        conn.close()

    def _save_snapshot(self, snapshot: "SerpSnapshot"):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            INSERT INTO serp_snapshots
            (keyword, marketplace, captured_at, top_ads_json, total_ad_count)
            VALUES (?, ?, ?, ?, ?)
        """, (
            snapshot.keyword,
            snapshot.marketplace,
            snapshot.captured_at,
            json.dumps([vars(a) for a in snapshot.top_ads]),
            snapshot.total_ad_count
        ))
        conn.commit()
        conn.close()

    def _load_latest_snapshot(self, keyword: str,
                               marketplace: str) -> Optional["SerpSnapshot"]:
        """加载指定关键词的最新历史快照作为基准线"""
        conn = sqlite3.connect(self.db_path)
        row = conn.execute("""
            SELECT top_ads_json, captured_at FROM serp_snapshots
            WHERE keyword = ? AND marketplace = ?
            ORDER BY captured_at DESC LIMIT 1
        """, (keyword, marketplace)).fetchone()
        conn.close()

        if not row:
            return None

        ads_data = json.loads(row[0])
        top_ads = [AdPosition(**d) for d in ads_data]
        return SerpSnapshot(
            keyword=keyword,
            marketplace=marketplace,
            captured_at=row[1],
            top_ads=top_ads
        )

    async def run_monitoring_cycle(
        self,
        keyword_tier: str = "A",
        marketplace: str = "US"
    ):
        """执行一轮监控扫描(按 Tier 分批执行)"""
        priority_config = self.config.get("alert_config", {}).get(
            "keyword_priority", {}
        )
        keywords = priority_config.get(keyword_tier, [])

        if not keywords:
            logger.info(f"Tier {keyword_tier} 无关键词,跳过")
            return

        logger.info(f"开始 Tier {keyword_tier} 监控扫描,{len(keywords)} 个关键词")

        # 批量采集 SERP
        snapshots = await batch_fetch_serp(keywords, marketplace)

        all_alerts = []
        for snapshot in snapshots:
            if not snapshot.query_success:
                continue

            # 保存当前快照
            self._save_snapshot(snapshot)

            # 加载基准线对比
            baseline = self._load_latest_snapshot(
                snapshot.keyword, marketplace
            )
            alerts = self.detector.compare(snapshot, baseline)
            all_alerts.extend(alerts)

        # 按严重度分发告警
        await self._dispatch_alerts(all_alerts)

        logger.info(f"Tier {keyword_tier} 扫描完成,共触发 {len(all_alerts)} 条告警")

    async def _dispatch_alerts(self, alerts: List["AdAlert"]):
        """根据告警级别分发到不同渠道"""
        slack_alerts = [a for a in alerts
                        if a.severity.value in ("CRITICAL", "HIGH")]
        email_alerts = [a for a in alerts
                        if a.severity.value == "MEDIUM"]

        if slack_alerts:
            await self._send_slack_batch(slack_alerts)

        if email_alerts:
            # 汇聚 MEDIUM 告警为日报,避免频繁通知
            logger.info(f"{len(email_alerts)} 条 MEDIUM 告警已写入日报队列")

    async def _send_slack_batch(self, alerts: List["AdAlert"]):
        """批量发送 Slack 告警"""
        slack_webhook = self.config.get("slack_webhook_url")
        if not slack_webhook:
            return

        for alert in alerts:
            emoji = "🚨" if alert.severity.value == "CRITICAL" else "⚠️"
            text = (
                f"{emoji} *[亚马逊广告监控]* `{alert.severity.value}`\n"
                f"关键词:`{alert.keyword}`\n"
                f"事件:{alert.alert_type}\n"
                f"详情:{alert.message}\n"
                f"时间:{alert.triggered_at[:16]} UTC"
            )

            async with aiohttp.ClientSession() as session:
                await session.post(slack_webhook,
                                   json={"text": text},
                                   timeout=aiohttp.ClientTimeout(total=10))


# 使用示例:在 cron / celery / APScheduler 中调用
async def main():
    config = {
        "db_path": "/data/ad_monitor.db",
        "slack_webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK",
        "alert_config": {
            "new_top1_entrant": "CRITICAL",
            "price_drop_pct": 12,
            "keyword_priority": {
                "A": ["wireless earbuds", "bluetooth earbuds", "tws earbuds"],
                "B": ["earbuds under 30", "earbuds for running"],
                "C": []
            }
        }
    }

    scheduler = AdMonitorScheduler(config)

    # Tier A:每2小时执行
    await scheduler.run_monitoring_cycle(keyword_tier="A")

if __name__ == "__main__":
    asyncio.run(main())
    

生产部署注意事项

在实际部署亚马逊广告监控系统时,有三个容易被低估的细节。第一是数据库选型:上面示例使用 SQLite,适合单机部署和小团队快速验证,但如果关键词数量超过 100 个、监控频率在 2 小时以内,建议切换到 PostgreSQL——SQLite 的写锁在高频写入场景会产生性能瓶颈。第二是 API 调用成本控制:按照上文的分层策略(A 类 2 小时、B 类 6 小时、C 类 24 小时),50 个关键词每天约产生 200-300 次 SERP API 调用,在 Pangolinfo 的标准套餐内通常完全覆盖,但如果监控关键词规模扩大,建议定期审查哪些词实际产生过有价值的告警,及时清理低价值的 C 类词。第三是告警去重:同一个 ASIN 在同一关键词的位置变化,如果 2 小时内触发了多次,系统应该有去重逻辑,避免运营收到重复通知。可以在 `_dispatch_alerts` 中加一个”同一 keyword + alert_type + asin 在 6 小时内只发一次”的过滤规则。

写在最后:亚马逊广告监控的数据壁垒,就是你的竞争壁垒

亚马逊广告竞争的本质,是信息不对称下的出价博弈。当你的团队能在竞品调整广告策略后 30 分钟内得到通知,而对手的运营要等到第二天早上开会才看到这个变化,这个时间差就是你每次能先一步响应的窗口期。持续积累的广告历史数据,也会逐步构建出你自己的竞品广告行为画像——哪个 ASIN 会在大促前 48 小时突增投放、哪个品牌习惯在周末下调价格配合单日冲量……这些模式,是最终沉淀下来的真正竞争护城河。

用 Open Claw + Pangolinfo SERP API 构建的这套亚马逊广告监控系统,总代码量在 500 行左右,一个有 Python 基础的技术运营可以在 1-2 天内完成部署和调试。数据层由 Pangolinfo 的 SP 广告位 98% 采集率保驾护航,分析层由 Open Claw 的 LLM 工具调用提供灵活的扩展能力。这是当前技术成本和运营效益之间最合理的配比之一。

如果你对更高级的应用场景感兴趣——比如将广告监控数据与 BSR 趋势、评论速率联动分析,或者建立关键词广告竞争强度的动态评分模型——这些都可以在当前系统架构上逐步扩展,无需推倒重来。

立即接入 Pangolinfo Scrape API,获取亚马逊实时 SERP 广告位数据,为你的广告监控系统提供最准确的数据基础。如需查阅完整 API 参数文档,请访问 Pangolinfo SERP API 文档

如有企业定制需求(多站点联合监控、仪表盘集成、数据分析服务),欢迎通过 Pangolinfo 官网 联系我们的解决方案团队。

解决方案

为电商场景打造的高可用数据采集 API,自动规避 IP 封禁、验证码拦截、代理故障等爬虫难题,无需复杂配置即可快速获取精准、稳定的电商数据。

AMZ Data Tracker 是亚马逊卖家专属的全方位运营工具,集关键词调研、竞品销量追踪、Listing 优化、恶意跟卖与差评监控于一体,助力卖家数据化决策,高效提升店铺销量与排名。

每周教程

准备好开始您的数据采集之旅了吗?

注册免费账户,立即体验强大的网页数据采集API,无需信用卡。

微信扫一扫
与我们联系

QR Code
快速测试

联系我们,您的问题,我们随时倾听

无论您在使用 Pangolin 产品的过程中遇到任何问题,或有任何需求与建议,我们都在这里为您提供支持。请填写以下信息,我们的团队将尽快与您联系,确保您获得最佳的产品体验。

Talk to our team

If you encounter any issues while using Pangolin products, please fill out the following information, and our team will contact you as soon as possible to ensure you have the best product experience.