A Guide to Best Practices in Amazon Data Scraping: The Complete Tutorial and API Solution for Amazon Data Scraping

This comprehensive guide provides a deep dive into the best practices for Amazon data scraping. It addresses core challenges such as Amazon's sophisticated anti-scraping mechanisms, dynamic page structures, and data accuracy issues. The article details technical solutions including IP rotation, header management, session handling, and advanced data parsing for dynamic content, product variations, and reviews. Furthermore, it compares in-house scraping with professional API solutions, highlighting the latter's advantages in cost, maintenance, and data quality. The guide concludes with practical case studies on product selection, competitor monitoring, and market trend prediction, offering a complete roadmap for leveraging Amazon data effectively.
Infographic showing the process of Amazon data scraping with data streams flowing from a shopping cart to a server, illustrating API solutions and anti-scraping techniques.

In today’s competitive e-commerce environment, data is money. Whether you want to monitor competitor pricing strategies, analyze market trends, or conduct product selection, Amazon data scraping has become an indispensable skill for cross-border e-commerce professionals. However, many people frequently encounter various problems when performing Amazon data scraping: low scraping efficiency, inaccurate data, frequent anti-scraping measures, and high maintenance costs… These pain points turn the data scraping work that was supposed to enhance business efficiency into a nightmare for technical teams.Why does this happen? The root of the problem often lies in an insufficient understanding of Amazon data scraping and the lack of a systematic methodology. This article will provide a detailed analysis of the best practices for Amazon data scraping from multiple dimensions, including technical implementation, anti-scraping strategies, and data parsing, to help you completely solve the various challenges in the data acquisition process.

Core Challenges in Amazon Data Scraping

The Complex Evolution of Anti-Scraping Mechanisms

As one of the world’s largest e-commerce platforms, Amazon’s anti-scraping system has become quite mature after years of iteration. Unlike the simple rate limiting of ordinary websites, Amazon’s anti-scraping strategy is multi-layered and multi-dimensional. First is the basic IP frequency limit, which triggers a temporary ban when a single IP’s request frequency is too high. More complex is the user behavior analysis, where the system detects metrics like mouse trajectory, page dwell time, and click patterns to determine if the behavior is robotic.
What’s more troublesome is that Amazon also makes comprehensive judgments based on factors like geographic location, device fingerprint, and browser characteristics. Even if you use a proxy IP, you can still be identified and have your access restricted if other characteristics expose automated behavior. This multi-dimensional detection mechanism often causes traditional scraping methods to fail, requiring more refined countermeasures.

Dynamic Changes in Page Structure

Amazon’s page structure is not static. The platform regularly conducts A/B testing, and different users may see completely different page layouts. More importantly, Amazon dynamically adjusts page content based on user behavior patterns, purchase history, geographic location, and other factors. This means your scraper program, which runs normally today, may fail tomorrow due to changes in the page structure.
This problem is particularly prominent when dealing with product detail pages. When accessing the same ASIN at different times, not only will dynamic information such as price and inventory change, but even the page’s DOM structure may be adjusted. Traditional data extraction methods based on XPath or CSS selectors are extremely fragile in the face of such changes and require frequent maintenance and adjustments.

Data Consistency and Accuracy Challenges

During the Amazon data scraping process, data consistency and accuracy are another major challenge. Due to the complexity of the Amazon platform, the same product information may display inconsistent data on different pages. For example, the price displayed on the search results page may differ from that on the detail page, and the number of ratings may also vary.
A more complex situation is that Amazon displays different delivery information and prices based on the user’s zip code. This requires considering geographic location factors when performing Amazon data scraping. If these variables are not handled correctly, the scraped data is likely to be inaccurate or incomplete, which is fatal for businesses that rely on data for decision-making.

Best Practices in Technical Implementation

Request Rate Control and IP Rotation Strategy

When performing Amazon data scraping, reasonable request rate control is fundamental to avoiding being banned. Based on practical testing experience, for the main Amazon site, the request frequency of a single IP is best controlled within 10-15 times per minute. However, this value is not absolute and needs to be adjusted according to the specific type of page being scraped. For example, the tolerance for product search pages is usually lower than that for detail pages.
The IP rotation strategy needs to consider multiple factors. The first is the quality of the IP pool; residential IPs usually perform better than datacenter IPs, but they also cost more. The second is the timing of rotation; it should not be a simple fixed-interval rotation but should be dynamically adjusted based on indicators such as response status and response time.

import requests
import time
import random
from itertools import cycle

class AmazonScraper:
    def __init__(self, proxy_list):
        self.proxy_cycle = cycle(proxy_list)
        self.session = requests.Session()
        self.last_request_time = 0

    def get_with_rotation(self, url, min_interval=4):
        # Control the request interval
        elapsed = time.time() - self.last_request_time
        if elapsed < min_interval:
            time.sleep(min_interval - elapsed + random.uniform(0.5, 1.5))

        # Rotate proxy
        proxy = next(self.proxy_cycle)
        proxies = {
            'http': proxy,
            'https': proxy
        }

        # Simulate a real browser request
        headers = {
            'User-Agent': self._get_random_user_agent(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        }

        try:
            response = self.session.get(url, proxies=proxies, headers=headers, timeout=10)
            self.last_request_time = time.time()
            return response
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            return None

Sophisticated Spoofing of User-Agent and Request Headers

Merely rotating IPs is not enough; spoofing request headers is equally important. Amazon’s anti-scraping system checks multiple fields such as User-Agent, Accept, and Accept-Language. A common mistake is using a User-Agent that is too simple or has an obvious scraper identifier.
Furthermore, the combination of request headers also matters. Real browser request headers have specific patterns. For example, a Chrome browser’s Accept-Encoding usually includes gzip and deflate, and the Accept field also has a fixed format. If these details do not match, it is easy to be identified as machine behavior.

import random

class RequestHeaderManager:
    def __init__(self):
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        ]

        self.accept_languages = [
            'en-US,en;q=0.9',
            'en-US,en;q=0.8,zh-CN;q=0.6',
            'en-GB,en;q=0.9,en-US;q=0.8'
        ]

    def get_headers(self):
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': random.choice(self.accept_languages),
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Cache-Control': 'max-age=0'
        }

Session Management and Cookie Handling

When performing Amazon data scraping, proper session management is a key factor in increasing the success rate. Amazon uses cookies to track user sessions, including geographic location information, language preferences, shopping cart status, etc. If each request is a completely new session, it not only increases the risk of being identified but may also prevent you from obtaining complete data.
It is particularly important to note that some of Amazon’s cookies have an expiration time and need to be re-acquired after they expire. Also, different Amazon regional sites require different cookie settings. For example, to obtain delivery information for a specific zip code, you need to set the corresponding location cookie.

import requests
from http.cookies import SimpleCookie

class AmazonSessionManager:
    def __init__(self, zipcode='10001'):
        self.session = requests.Session()
        self.zipcode = zipcode
        self.setup_session()

    def setup_session(self):
        # First, visit the homepage to establish a basic session
        self.session.get('https://www.amazon.com', timeout=10)

        # Set the zip code
        self.set_zipcode(self.zipcode)

    def set_zipcode(self, zipcode):
        # Set the delivery address cookie
        cookie_data = f'postal-code={zipcode}; lc-main=en_US'
        cookie = SimpleCookie()
        cookie.load(cookie_data)

        for key, morsel in cookie.items():
            self.session.cookies[key] = morsel.value

    def get_product_details(self, asin):
        url = f'https://www.amazon.com/dp/{asin}'

        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Referer': 'https://www.amazon.com/',
            'Accept-Language': 'en-US,en;q=0.9'
        }

        response = self.session.get(url, headers=headers, timeout=15)
        return response

Advanced Data Parsing Techniques

Handling Dynamic Content

Modern Amazon pages use JavaScript extensively to dynamically load content, especially key information like prices, inventory status, and reviews. The traditional requests library can only get the initial HTML and cannot execute JavaScript, which means a lot of important data cannot be obtained.
There are several ways to solve this problem. The first is to use browser automation tools like Selenium, but this method consumes a lot of resources, is slow, and is more easily detected. The second is to analyze the webpage’s AJAX requests and directly call the API endpoints to get the data. The third is to use a headless browser, finding a balance between performance and functionality.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

class DynamicContentScraper:
    def __init__(self):
        self.setup_driver()

    def setup_driver(self):
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

        self.driver = webdriver.Chrome(options=chrome_options)

    def get_dynamic_price(self, asin):
        url = f'https://www.amazon.com/dp/{asin}'
        self.driver.get(url)

        try:
            # Wait for the price element to load
            price_element = WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, '.a-price-whole'))
            )

            # Get the complete price information
            whole_price = price_element.text
            fraction_price = self.driver.find_element(By.CSS_SELECTOR, '.a-price-fraction').text

            return f"{whole_price}.{fraction_price}"

        except Exception as e:
            print(f"Failed to get price for {asin}: {e}")
            return None

    def close(self):
        if hasattr(self, 'driver'):
            self.driver.quit()

Adapting to Variable DOM Structures

Changes in Amazon’s page structure are one of the biggest challenges in data extraction. Different product categories, different seller types, and different promotional statuses can all lead to changes in the page structure. To cope with this change, a set of flexible data extraction rules needs to be established.
An effective method is to establish a multi-level selector backup mechanism. When the main selector fails, it automatically tries the backup selectors. At the same time, text matching and positional relationships can be combined to improve extraction accuracy.

from bs4 import BeautifulSoup
import re

class FlexibleDataExtractor:
    def __init__(self):
        # Define multi-level selectors
        self.price_selectors = [
            '.a-price.a-text-price.a-size-medium.apexPriceToPay .a-offscreen',
            '.a-price-whole',
            '#priceblock_dealprice',
            '#priceblock_ourprice',
            '.a-price .a-offscreen',
            '.a-price-display .a-price-symbol + .a-price-whole'
        ]

        self.title_selectors = [
            '#productTitle',
            '.a-size-large.product-title-word-break',
            '.a-text-normal .a-size-base-plus'
        ]

    def extract_price(self, soup):
        for selector in self.price_selectors:
            try:
                element = soup.select_one(selector)
                if element and element.text.strip():
                    price_text = element.text.strip()
                    # Clean the price text
                    price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
                    if price_match:
                        return float(price_match.group())
            except Exception as e:
                continue

        # If all selectors fail, try text matching
        return self.extract_price_by_text(soup)

    def extract_price_by_text(self, soup):
        # Search for price patterns in the page
        price_patterns = [
            r'\$[\d,]+\.?\d*',
            r'Price:\s*\$[\d,]+\.?\d*',
            r'Our Price:\s*\$[\d,]+\.?\d*'
        ]

        page_text = soup.get_text()
        for pattern in price_patterns:
            matches = re.findall(pattern, page_text)
            if matches:
                price_text = matches[0].replace('$', '').replace(',', '')
                price_match = re.search(r'[\d.]+', price_text)
                if price_match:
                    return float(price_match.group())

        return None

    def extract_title(self, soup):
        for selector in self.title_selectors:
            try:
                element = soup.select_one(selector)
                if element and element.text.strip():
                    return element.text.strip()
            except Exception:
                continue

        return None

Handling Product Variations and Options

Amazon products often have multiple variations, such as different sizes, colors, capacities, etc. These variations usually share the same parent ASIN but have different child ASINs. When scraping data, it is necessary to be able to identify and handle these variation relationships.
A more complex situation is when the price and inventory information for certain variations need to be loaded dynamically via AJAX requests. This requires us not only to parse static HTML but also to simulate the user’s behavior of selecting different options to trigger the corresponding data updates.

import json
import re
from urllib.parse import urljoin

class VariantHandler:
    def __init__(self, session):
        self.session = session

    def extract_variants(self, soup, base_url):
        variants = []

        # Find the variant data script
        scripts = soup.find_all('script')
        for script in scripts:
            if script.string and 'dimensionValuesDisplayData' in script.string:
                # Extract the variant configuration data
                variant_data = self.parse_variant_script(script.string)
                if variant_data:
                    variants.extend(variant_data)

        # Find variant links
        variant_links = soup.select('.a-button-text img')
        for link in variant_links:
            parent = link.find_parent('a')
            if parent and parent.get('href'):
                variant_url = urljoin(base_url, parent['href'])
                variants.append({
                    'url': variant_url,
                    'image': link.get('src'),
                    'alt': link.get('alt')
                })

        return variants

    def parse_variant_script(self, script_content):
        try:
            # Extract JSON data
            json_match = re.search(r'dimensionValuesDisplayData\s*:\s*({.+?})\s*[,}]', 
                                 script_content, re.DOTALL)
            if json_match:
                data = json.loads(json_match.group(1))
                return self.process_variant_data(data)
        except Exception as e:
            print(f"Failed to parse variant script: {e}")

        return []

    def process_variant_data(self, data):
        variants = []
        for key, variant_info in data.items():
            if isinstance(variant_info, dict):
                variants.append({
                    'asin': variant_info.get('ASIN'),
                    'price': variant_info.get('price'),
                    'availability': variant_info.get('availability'),
                    'variant_key': key
                })

        return variants

    def get_variant_price(self, asin, dimension_values):
        # Construct an AJAX request to get the variant price
        ajax_url = f'https://www.amazon.com/gp/product/ajax/ref=dp_aod_NEW_mbc'

        params = {
            'asin': asin,
            'pc': 'dp',
            'experienceId': 'aodAjaxMain'
        }

        # Add dimension parameters
        for key, value in dimension_values.items():
            params[f'dimension_{key}'] = value

        try:
            response = self.session.get(ajax_url, params=params, timeout=10)
            if response.status_code == 200:
                return self.parse_ajax_price_response(response.text)
        except Exception as e:
            print(f"Failed to get variant price: {e}")

        return None

    def parse_ajax_price_response(self, response_text):
        # Parse the price information from the AJAX response
        soup = BeautifulSoup(response_text, 'html.parser')
        price_element = soup.select_one('.a-price .a-offscreen')
        if price_element:
            price_text = price_element.text.strip()
            price_match = re.search(r'[\d.]+', price_text.replace(',', ''))
            if price_match:
                return float(price_match.group())

        return None

Strategies for Handling Special Data Types

In-depth Scraping of Review Data

Amazon’s review data is of great value for product analysis and market research, but scraping review data faces special challenges. First, reviews are usually displayed on paginated pages, requiring handling of pagination logic. Second, the sorting method of reviews (latest, most helpful, etc.) affects the content displayed. Most importantly, Amazon recently launched the “Customer Says” feature, and this part of the data is loaded dynamically via AJAX, making it difficult for traditional methods to obtain.
When processing review data, it is also necessary to pay attention to the structured information of the reviews, including rating, review time, verified purchase status, helpfulness votes, etc. This metadata is also important for review quality analysis.

import time
from datetime import datetime
from bs4 import BeautifulSoup

class ReviewScraper:
    def __init__(self, session):
        self.session = session

    def get_all_reviews(self, asin, max_pages=10):
        reviews = []
        base_url = f'https://www.amazon.com/product-reviews/{asin}'

        for page in range(1, max_pages + 1):
            params = {
                'ie': 'UTF8',
                'reviewerType': 'all_reviews',
                'pageNumber': page,
                'sortBy': 'recent'
            }

            try:
                response = self.session.get(base_url, params=params, timeout=15)
                if response.status_code != 200:
                    break

                soup = BeautifulSoup(response.content, 'html.parser')
                page_reviews = self.extract_reviews_from_page(soup)

                if not page_reviews:  # No more reviews
                    break

                reviews.extend(page_reviews)
                time.sleep(2)  # Control request frequency

            except Exception as e:
                print(f"Failed to scrape page {page}: {e}")
                break

        return reviews

    def extract_reviews_from_page(self, soup):
        reviews = []
        review_elements = soup.select('[data-hook="review"]')

        for element in review_elements:
            try:
                review_data = self.parse_single_review(element)
                if review_data:
                    reviews.append(review_data)
            except Exception as e:
                print(f"Failed to parse review: {e}")
                continue

        return reviews

    def parse_single_review(self, review_element):
        # Extract basic review information
        rating_element = review_element.select_one('[data-hook="review-star-rating"]')
        rating = None
        if rating_element:
            rating_text = rating_element.get('class', [])
            for cls in rating_text:
                if 'a-star-' in cls:
                    rating_match = re.search(r'a-star-(\d)', cls)
                    if rating_match:
                        rating = int(rating_match.group(1))
                        break

        # Extract title
        title_element = review_element.select_one('[data-hook="review-title"]')
        title = title_element.get_text(strip=True) if title_element else None

        # Extract body
        body_element = review_element.select_one('[data-hook="review-body"]')
        body = body_element.get_text(strip=True) if body_element else None

        # Extract date
        date_element = review_element.select_one('[data-hook="review-date"]')
        date = None
        if date_element:
            date_text = date_element.get_text(strip=True)
            date_match = re.search(r'on (.+)', date_text)
            if date_match:
                try:
                    date = datetime.strptime(date_match.group(1), '%B %d, %Y')
                except ValueError:
                    pass

        # Extract helpfulness vote
        helpful_element = review_element.select_one('[data-hook="helpful-vote-statement"]')
        helpful_count = 0
        if helpful_element:
            helpful_text = helpful_element.get_text(strip=True)
            helpful_match = re.search(r'(\d+)', helpful_text)
            if helpful_match:
                helpful_count = int(helpful_match.group(1))

        # Check if it is a verified purchase
        verified = bool(review_element.select_one('[data-hook="avp-badge"]'))

        return {
            'rating': rating,
            'title': title,
            'body': body,
            'date': date,
            'helpful_count': helpful_count,
            'verified_purchase': verified
        }

    def get_customer_says_data(self, asin):
        # AJAX request to get Customer Says data
        ajax_url = 'https://www.amazon.com/hz/reviews-render/ajax/medley-filtered-reviews'

        params = {
            'asin': asin,
            'reviewerType': 'all_reviews',
            'mediaType': 'all_contents',
            'filterByStar': 'all_stars'
        }

        headers = {
            'X-Requested-With': 'XMLHttpRequest',
            'Referer': f'https://www.amazon.com/dp/{asin}'
        }

        try:
            response = self.session.get(ajax_url, params=params, headers=headers, timeout=10)
            if response.status_code == 200:
                return self.parse_customer_says_response(response.json())
        except Exception as e:
            print(f"Failed to get customer says data: {e}")

        return None

    def parse_customer_says_response(self, response_data):
        customer_says = {}

        if 'medley' in response_data:
            medley_data = response_data['medley']

            # Extract keyword sentiment analysis
            if 'keywords' in medley_data:
                keywords = {}
                for keyword_data in medley_data['keywords']:
                    keywords[keyword_data['text']] = {
                        'sentiment': keyword_data.get('sentiment'),
                        'count': keyword_data.get('count'),
                        'percentage': keyword_data.get('percentage')
                    }
                customer_says['keywords'] = keywords

            # Extract popular review summaries
            if 'summaries' in medley_data:
                summaries = []
                for summary in medley_data['summaries']:
                    summaries.append({
                        'text': summary.get('text'),
                        'sentiment': summary.get('sentiment'),
                        'source_count': summary.get('sourceCount')
                    })
                customer_says['summaries'] = summaries

        return customer_says

Identifying and Handling Sponsored Ad Slots

When scraping Amazon search results, correctly identifying and handling Sponsored ad slots is a technical challenge. The HTML structure of these ad slots is slightly different from natural search results, and Amazon constantly adjusts its display logic. More importantly, the appearance of ad slots has a certain randomness; the same search keyword may display different ads at different times.
To achieve an ad slot identification rate of over 98%, a combination of multiple judgment criteria is needed: HTML attributes, CSS class names, element position, text identifiers, etc. It is also necessary to consider differences across different device types and regions.

class SponsoredAdDetector:
    def __init__(self):
        # Define multiple criteria for ad identification
        self.sponsored_indicators = {
            'attributes': ['data-sponsored', 'data-asin', 'data-cel-widget'],
            'css_classes': ['AdHolder', 's-sponsored-info-icon', 'a-size-base-plus'],
            'text_patterns': ['Sponsored', 'Ad', '#ad', 'Promoted'],
            'structural_patterns': ['[data-component-type="sp-sponsored-result"]']
        }

    def detect_sponsored_products(self, soup):
        sponsored_products = []

        # Find all possible product containers
        product_containers = soup.select('[data-asin], [data-index], .s-result-item')

        for container in product_containers:
            if self.is_sponsored(container):
                product_data = self.extract_sponsored_product_data(container)
                if product_data:
                    sponsored_products.append(product_data)

        return sponsored_products

    def is_sponsored(self, element):
        # Multi-dimensional judgment to determine if it is an ad
        confidence_score = 0

        # Check attribute identifiers
        if element.get('data-sponsored'):
            confidence_score += 30

        if element.get('data-component-type') == 'sp-sponsored-result':
            confidence_score += 40

        # Check CSS class names
        element_classes = ' '.join(element.get('class', []))
        for indicator_class in self.sponsored_indicators['css_classes']:
            if indicator_class in element_classes:
                confidence_score += 15

        # Check text identifiers
        element_text = element.get_text().lower()
        for text_pattern in self.sponsored_indicators['text_patterns']:
            if text_pattern.lower() in element_text:
                confidence_score += 20

        # Check for sponsored icon
        sponsored_icon = element.select_one('.s-sponsored-info-icon, [aria-label*="ponsored"]')
        if sponsored_icon:
            confidence_score += 25

        # Check positional features (the first few results are more likely to be ads)
        parent_results = element.find_parent().select('.s-result-item')
        if parent_results:
            try:
                position = parent_results.index(element) + 1
                if position <= 4:  # First 4 positions
                    confidence_score += 10
            except ValueError:
                pass # Element not in the list

        return confidence_score >= 50

    def extract_sponsored_product_data(self, element):
        try:
            # Extract ASIN
            asin = element.get('data-asin')
            if not asin:
                asin_link = element.select_one('[data-asin]')
                asin = asin_link.get('data-asin') if asin_link else None

            # Extract title
            title_element = element.select_one('h2 a span, .s-color-base')
            title = title_element.get_text(strip=True) if title_element else None

            # Extract price
            price_element = element.select_one('.a-price .a-offscreen')
            price = None
            if price_element:
                price_text = price_element.get_text(strip=True)
                price_match = re.search(r'[\d.]+', price_text.replace(',', ''))
                if price_match:
                    price = float(price_match.group())

            # Extract rating
            rating_element = element.select_one('.a-icon-alt')
            rating = None
            if rating_element:
                rating_text = rating_element.get('title', '')
                rating_match = re.search(r'(\d\.?\d*)', rating_text)
                if rating_match:
                    rating = float(rating_match.group(1))

            # Extract image
            image_element = element.select_one('.s-image')
            image_url = image_element.get('src') if image_element else None

            return {
                'asin': asin,
                'title': title,
                'price': price,
                'rating': rating,
                'image_url': image_url,
                'is_sponsored': True,
                'ad_type': self.determine_ad_type(element)
            }

        except Exception as e:
            print(f"Failed to extract sponsored product data: {e}")
            return None

    def determine_ad_type(self, element):
        # Determine the ad type
        if element.select_one('[data-component-type="sp-sponsored-result"]'):
            return 'sponsored_product'
        elif 'AdHolder' in element.get('class', []):
            return 'display_ad'
        else:
            return 'unknown'

Complete Scraping of Ranking Data

Amazon’s various ranking lists (Best Sellers, New Releases, Movers & Shakers, etc.) are important sources of market data, but scraping ranking data has its peculiarities. First, the rankings are dynamically updated over time, so the timeliness of the data needs to be considered. Second, the rankings usually have a category hierarchy, requiring traversal of different categories. Most importantly, some ranking data may require a login to access complete information.

class AmazonRankingScraper:
    def __init__(self, session):
        self.session = session
        self.category_urls = self.load_category_mapping()

    def load_category_mapping(self):
        # URL mapping for major Amazon categories
        return {
            'electronics': 'https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics',
            'books': 'https://www.amazon.com/Best-Sellers-Books/zgbs/books',
            'home_garden': 'https://www.amazon.com/Best-Sellers-Home-Garden/zgbs/home-garden',
            'toys_games': 'https://www.amazon.com/Best-Sellers-Toys-Games/zgbs/toys-and-games',
            'sports_outdoors': 'https://www.amazon.com/Best-Sellers-Sports-Outdoors/zgbs/sporting-goods'
        }

    def scrape_bestsellers_category(self, category_key, max_pages=5):
        if category_key not in self.category_urls:
            raise ValueError(f"Unknown category: {category_key}")

        base_url = self.category_urls[category_key]
        products = []

        for page in range(1, max_pages + 1):
            page_url = f"{base_url}?pg={page}" if page > 1 else base_url

            try:
                response = self.session.get(page_url, timeout=15)
                if response.status_code != 200:
                    break

                soup = BeautifulSoup(response.content, 'html.parser')
                page_products = self.extract_ranking_products(soup, page)

                if not page_products:
                    break

                products.extend(page_products)
                time.sleep(2)

            except Exception as e:
                print(f"Failed to scrape page {page} of {category_key}: {e}")
                break

        return products

    def extract_ranking_products(self, soup, page_num):
        products = []

        # Main container for ranking products
        product_elements = soup.select('.zg-item-immersion, .zg-item')

        for idx, element in enumerate(product_elements):
            try:
                # Calculate rank (considering pagination)
                rank = (page_num - 1) * 50 + idx + 1 # Assuming 50 items per page

                # Extract ASIN
                asin_link = element.select_one('a[href*="/dp/"]')
                asin = None
                if asin_link:
                    href = asin_link.get('href', '')
                    asin_match = re.search(r'/dp/([A-Z0-9]{10})', href)
                    if asin_match:
                        asin = asin_match.group(1)

                # Extract title
                title_element = element.select_one('.p13n-sc-truncated, ._cDEzb_p13n-sc-css-line-clamp-1_1Fn1y')
                title = title_element.get_text(strip=True) if title_element else None

                # Extract price
                price_element = element.select_one('.p13n-sc-price, .a-price .a-offscreen')
                price = self.parse_price(price_element.get_text() if price_element else None)

                # Extract rating information
                rating_element = element.select_one('.a-icon-alt')
                rating = None
                if rating_element:
                    rating_text = rating_element.get_text(strip=True)
                    rating_match = re.search(r'(\d\.?\d*)', rating_text)
                    if rating_match:
                        rating = float(rating_match.group(1))

                # Extract review count
                review_element = element.select_one('.a-size-small .a-link-normal')
                review_count = None
                if review_element:
                    review_text = review_element.get_text(strip=True)
                    review_match = re.search(r'([\d,]+)', review_text.replace(',', ''))
                    if review_match:
                        review_count = int(review_match.group(1))

                product_data = {
                    'rank': rank,
                    'asin': asin,
                    'title': title,
                    'price': price,
                    'rating': rating,
                    'review_count': review_count,
                    'category': self.extract_category_breadcrumb(soup)
                }

                products.append(product_data)

            except Exception as e:
                print(f"Failed to extract product at index {idx}: {e}")
                continue

        return products

    def parse_price(self, price_text):
        if not price_text:
            return None

        # Clean the price text and extract the number
        clean_price = re.sub(r'[^\d.]', '', price_text.replace(',', ''))
        try:
            return float(clean_price) if clean_price else None
        except ValueError:
            return None

    def extract_category_breadcrumb(self, soup):
        # Extract the category breadcrumb navigation
        breadcrumb_element = soup.select_one('.a-breadcrumb .a-list-item:last-child')
        return breadcrumb_element.get_text(strip=True) if breadcrumb_element else None

    def get_subcategories(self, category_url):
        # Get the list of subcategories
        try:
            response = self.session.get(category_url, timeout=10)
            soup = BeautifulSoup(response.content, 'html.parser')

            subcategories = []
            category_links = soup.select('.zg_browseRoot a')

            for link in category_links:
                subcategories.append({
                    'name': link.get_text(strip=True),
                    'url': urljoin(category_url, link.get('href'))
                })

            return subcategories

        except Exception as e:
            print(f"Failed to get subcategories: {e}")
            return []

Advantages of Professional API Solutions

Faced with the many challenges of Amazon data scraping, more and more companies are starting to consider using professional API services. Compared to building an in-house scraping team, professional APIs have significant advantages in several aspects.

Considerations of Technical Barriers and Maintenance Costs

Building an in-house Amazon data scraping system requires a large amount of technical investment. First is the human cost, requiring experienced scraper engineers to design and implement complex anti-scraping strategies. More importantly is the continuous maintenance cost; Amazon’s page structure and anti-scraping mechanisms are constantly changing, which requires the technical team to continuously follow up and make adjustments.

Taking the Pangolin Scrape API as an example, its team has accumulated rich experience in Amazon data scraping and can quickly adapt to various changes on the platform. For businesses, using such a professional service allows them to focus their technical team’s energy on core business logic, rather than spending a lot of time dealing with the technical details of data acquisition.

# Simple example of using a professional API
import requests

class PangolinAPIClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://scrapeapi.pangolinfo.com/api/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def scrape_product_details(self, asin, zipcode="10001"):
        url = f"{self.base_url}/scrape"

        payload = {
            "url": f"https://www.amazon.com/dp/{asin}",
            "formats": ["json"],
            "parserName": "amzProductDetail",
            "bizContext": {"zipcode": zipcode}
        }

        try:
            response = requests.post(url, json=payload, headers=self.headers, timeout=30)

            if response.status_code == 200:
                return response.json()
            else:
                print(f"API request failed with status {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            return None

    def batch_scrape_products(self, asin_list, zipcode="10001"):
        results = {}

        for asin in asin_list:
            print(f"Scraping product: {asin}")
            result = self.scrape_product_details(asin, zipcode)

            if result and result.get('code') == 0:
                results[asin] = result['data']
            else:
                results[asin] = None

            # Control request frequency to avoid being too frequent
            time.sleep(1)

        return results

# Practical usage example
def analyze_competitor_products():
    client = PangolinAPIClient("your_api_key_here")

    competitor_asins = [
        "B08N5WRWNW",  # Echo Dot
        "B07XJ8C8F5",  # Fire TV Stick
        "B079QHML21"   # Echo Show
    ]

    product_data = client.batch_scrape_products(competitor_asins)

    # Analyze price trends
    for asin, data in product_data.items():
        if data:
            print(f"Product {asin}:")
            print(f"  Title: {data.get('title')}")
            print(f"  Price: ${data.get('price')}")
            print(f"  Rating: {data.get('rating')} ({data.get('review_count')} reviews)")
            print("  ---")

Data Quality and Accuracy Guarantees

Professional API services have significant advantages in terms of data quality. Taking the identification of Sponsored ad slots as an example, Pangolin can achieve an identification accuracy of 98%, which is crucial for businesses that need to accurately analyze keyword traffic sources. Such accuracy requires a large amount of test data and algorithm optimization, which is not something an ordinary in-house team can quickly achieve.
More important is data completeness. After Amazon closed the traditional review data interface, how to obtain complete “Customer Says” data has become a technical difficulty. Professional API services have already found effective solutions and can provide complete data, including sentiment analysis corresponding to various popular review words.

Scalability

When the business scale reaches a certain level, the scalability of data scraping becomes crucial. Professional API services usually have the ability to process tens of millions of pages per day, a scale that is difficult for most in-house teams to achieve. More importantly, professional services have significant advantages in cost optimization, with relatively low marginal costs.

Compliance and Risk Control

Averting Legal Risks

When conducting Amazon data scraping, compliance is an important factor that cannot be ignored. Although scraping public data is legal in most cases, the specific implementation method may involve issues with the platform’s terms of service. Professional API service providers usually have a deeper understanding of these legal risks and can adopt more prudent methods in their technical implementation.

Account Safety and Risk Isolation

An important issue faced by self-built scraper systems is the risk to account safety. If the scraper’s behavior is detected by Amazon, it may affect the company’s normal operating accounts. Using a professional API service can effectively isolate this risk and avoid affecting the normal operation of the core business.

Practical Case Studies

Complete Workflow for Product Selection Analysis

Let’s use a complete product selection analysis case to demonstrate the practical application of Amazon data scraping. Suppose we are looking for potential product opportunities in the home goods category.

class ProductOpportunityAnalyzer:
    def __init__(self, api_client):
        self.api_client = api_client

    def analyze_category_opportunity(self, category_keywords, min_price=20, max_price=100):
        # Step 1: Get keyword search results
        search_data = {}
        for keyword in category_keywords:
            print(f"Analyzing keyword: {keyword}")
            results = self.api_client.scrape_keyword_results(keyword)
            search_data[keyword] = results

        # Step 2: Filter products within the price range
        target_products = self.filter_products_by_price(search_data, min_price, max_price)

        # Step 3: Get detailed product information
        detailed_data = {}
        for asin in target_products:
            product_details = self.api_client.scrape_product_details(asin)
            if product_details:
                detailed_data[asin] = product_details

        # Step 4: Competitive analysis
        opportunities = self.identify_opportunities(detailed_data)

        return opportunities

    def filter_products_by_price(self, search_data, min_price, max_price):
        target_asins = set()

        for keyword, results in search_data.items():
            if results and results.get('data'):
                for product in results['data'].get('products', []):
                    price = product.get('price')
                    if price and min_price <= price <= max_price:
                        target_asins.add(product.get('asin'))

        return list(target_asins)

    def identify_opportunities(self, product_data):
        opportunities = []

        for asin, data in product_data.items():
            # Calculate opportunity score
            opportunity_score = self.calculate_opportunity_score(data)

            if opportunity_score >= 70:  # High-score products
                opportunities.append({
                    'asin': asin,
                    'title': data.get('title'),
                    'price': data.get('price'),
                    'rating': data.get('rating'),
                    'review_count': data.get('review_count'),
                    'opportunity_score': opportunity_score,
                    'opportunity_factors': self.analyze_opportunity_factors(data)
                })

        # Sort by opportunity score
        opportunities.sort(key=lambda x: x['opportunity_score'], reverse=True)

        return opportunities

    def calculate_opportunity_score(self, product_data):
        score = 0

        # Rating factor (4.0-4.5 stars has room for optimization)
        rating = product_data.get('rating', 0)
        if 4.0 <= rating <= 4.5:
            score += 25
        elif rating < 4.0:
            score += 15

        # Number of reviews (not many reviews but a certain foundation)
        review_count = product_data.get('review_count', 0)
        if 100 <= review_count <= 1000:
            score += 30
        elif 50 <= review_count < 100:
            score += 20

        # Price competitiveness
        price = product_data.get('price', 0)
        if 30 <= price <= 80:  # Mid-range prices have more room for optimization
            score += 20

        # Image quality analysis
        images = product_data.get('images', [])
        if len(images) < 7:  # Insufficient number of images, an opportunity for optimization
            score += 15

        return min(score, 100)  # Max 100 points

    def analyze_opportunity_factors(self, product_data):
        factors = []

        rating = product_data.get('rating', 0)
        if rating < 4.5:
            factors.append(f"Room for rating improvement (Current: {rating})")

        review_count = product_data.get('review_count', 0)
        if review_count < 500:
            factors.append(f"Fewer reviews, less competition (Current: {review_count})")

        images = product_data.get('images', [])
        if len(images) < 7:
            factors.append(f"Insufficient number of product images (Current: {len(images)} images)")

        # Analyze customer feedback
        customer_says = product_data.get('customer_says', {})
        if customer_says:
            negative_keywords = self.extract_negative_feedback(customer_says)
            if negative_keywords:
                factors.append(f"Customer pain points: {', '.join(negative_keywords[:3])}")

        return factors

    def extract_negative_feedback(self, customer_says_data):
        negative_keywords = []

        keywords = customer_says_data.get('keywords', {})
        for keyword, data in keywords.items():
            if data.get('sentiment') == 'negative' and data.get('count', 0) > 5:
                negative_keywords.append(keyword)

        return negative_keywords

# Usage example
def run_opportunity_analysis():
    api_client = PangolinAPIClient("your_api_key")
    analyzer = ProductOpportunityAnalyzer(api_client)

    home_keywords = [
        "kitchen organizer",
        "bathroom storage",
        "closet organizer",
        "desk organizer"
    ]

    opportunities = analyzer.analyze_category_opportunity(
        home_keywords, 
        min_price=25, 
        max_price=75
    )

    print("=== Product Opportunity Analysis Report ===")
    for i, opp in enumerate(opportunities[:5], 1):
        print(f"\n{i}. {opp['title'][:50]}...")
        print(f"   ASIN: {opp['asin']}")
        print(f"   Price: ${opp['price']}")
        print(f"   Rating: {opp['rating']} ({opp['review_count']} reviews)")
        print(f"   Opportunity Score: {opp['opportunity_score']}/100")
        print(f"   Optimization Opportunities:")
        for factor in opp['opportunity_factors']:
            print(f"     • {factor}")

Competitor Monitoring System

Continuously monitoring competitors’ prices, inventory, and promotional strategies is an important part of e-commerce operations. Through regular data scraping, you can promptly discover market changes and adjust your own strategies.

import sqlite3
from datetime import datetime, timedelta
import pandas as pd

class CompetitorMonitoringSystem:
    def __init__(self, api_client, db_path="competitor_data.db"):
        self.api_client = api_client
        self.db_path = db_path
        self.init_database()

    def init_database(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS competitor_products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                asin TEXT NOT NULL,
                competitor_name TEXT,
                category TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                asin TEXT NOT NULL,
                price REAL,
                stock_status TEXT,
                coupon_discount REAL,
                prime_eligible BOOLEAN,
                scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (asin) REFERENCES competitor_products (asin)
            )
        ''')

        conn.commit()
        conn.close()

    def add_competitor_product(self, asin, competitor_name, category):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT OR IGNORE INTO competitor_products (asin, competitor_name, category)
            VALUES (?, ?, ?)
        ''', (asin, competitor_name, category))

        conn.commit()
        conn.close()

    def monitor_daily_prices(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('SELECT DISTINCT asin FROM competitor_products')
        asins = [row[0] for row in cursor.fetchall()]
        conn.close()

        for asin in asins:
            try:
                print(f"Monitoring product: {asin}")
                product_data = self.api_client.scrape_product_details(asin)

                if product_data and product_data.get('code') == 0:
                    data = product_data['data']
                    self.save_price_data(asin, data)

                time.sleep(2)  # Control frequency

            except Exception as e:
                print(f"Failed to monitor {asin}: {e}")

    def save_price_data(self, asin, data):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT INTO price_history 
            (asin, price, stock_status, coupon_discount, prime_eligible)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            asin,
            data.get('price'),
            'in_stock' if data.get('has_cart') else 'out_of_stock',
            data.get('coupon', 0),
            data.get('prime_eligible', False)
        ))

        conn.commit()
        conn.close()

    def get_price_trend_analysis(self, asin, days=30):
        conn = sqlite3.connect(self.db_path)

        query = f'''
            SELECT price, scraped_at 
            FROM price_history 
            WHERE asin = ? AND scraped_at >= date('now', '-{days} days')
            ORDER BY scraped_at
        '''

        df = pd.read_sql_query(query, conn, params=(asin,))
        conn.close()

        if df.empty:
            return None

        df['scraped_at'] = pd.to_datetime(df['scraped_at'])

        analysis = {
            'current_price': df['price'].iloc[-1],
            'min_price': df['price'].min(),
            'max_price': df['price'].max(),
            'avg_price': df['price'].mean(),
            'price_volatility': df['price'].std(),
            'price_trend': 'stable'
        }

        # Simple trend analysis
        recent_avg = df['price'].tail(7).mean()
        earlier_avg = df['price'].head(7).mean()

        if recent_avg > earlier_avg * 1.05:
            analysis['price_trend'] = 'increasing'
        elif recent_avg < earlier_avg * 0.95:
            analysis['price_trend'] = 'decreasing'

        return analysis

    def generate_competitive_report(self):
        conn = sqlite3.connect(self.db_path)

        # Get the latest data for all monitored products
        query = '''
            SELECT p.asin, p.competitor_name, p.category,
                   ph.price, ph.stock_status, ph.scraped_at
            FROM competitor_products p
            JOIN price_history ph ON p.asin = ph.asin
            WHERE ph.scraped_at = (
                SELECT MAX(scraped_at) 
                FROM price_history 
                WHERE asin = p.asin
            )
        '''

        df = pd.read_sql_query(query, conn)
        conn.close()

        print("\n=== Competitor Monitoring Report ===")
        print(f"Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"Number of Monitored Products: {len(df)}")

        # Group analysis by category
        for category in df['category'].unique():
            category_data = df[df['category'] == category]
            print(f"\n【{category} Category】")

            for _, row in category_data.iterrows():
                trend_analysis = self.get_price_trend_analysis(row['asin'])

                print(f"  Product: {row['asin']} ({row['competitor_name']})")
                print(f"    Current Price: ${row['price']:.2f}")
                print(f"    Stock Status: {row['stock_status']}")

                if trend_analysis:
                    print(f"    Price Trend: {trend_analysis['price_trend']}")
                    print(f"    30-Day Price Range: ${trend_analysis['min_price']:.2f} - ${trend_analysis['max_price']:.2f}")

Exploring Advanced Application Scenarios

Market Trend Prediction Model

By accumulating Amazon data over the long term, we can build simple market trend prediction models. This includes not only price trends but also multi-dimensional analysis such as demand changes and seasonal characteristics.

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

class MarketTrendPredictor:
    def __init__(self, db_path):
        self.db_path = db_path
        self.scaler = StandardScaler()
        self.models = {}

    def prepare_trend_data(self, asin, feature_days=30):
        conn = sqlite3.connect(self.db_path)

        # Get historical data
        query = '''
            SELECT price, scraped_at,
                   LAG(price, 1) OVER (ORDER BY scraped_at) as prev_price,
                   LAG(price, 7) OVER (ORDER BY scraped_at) as week_ago_price
            FROM price_history 
            WHERE asin = ? 
            ORDER BY scraped_at
        '''

        df = pd.read_sql_query(query, conn, params=(asin,))
        conn.close()

        if len(df) < feature_days:
            return None, None

        # Construct features
        df['price_change_1d'] = df['price'] - df['prev_price']
        df['price_change_7d'] = df['price'] - df['week_ago_price']
        df['day_of_week'] = pd.to_datetime(df['scraped_at']).dt.dayofweek
        df['day_of_month'] = pd.to_datetime(df['scraped_at']).dt.day

        # Moving average
        df['ma_7'] = df['price'].rolling(window=7).mean()
        df['ma_14'] = df['price'].rolling(window=14).mean()

        # Price volatility
        df['volatility_7'] = df['price'].rolling(window=7).std()

        # Remove rows containing NaN
        df = df.dropna()

        # Prepare features and target
        features = ['price_change_1d', 'price_change_7d', 'day_of_week', 
                   'day_of_month', 'ma_7', 'ma_14', 'volatility_7']

        X = df[features].values
        y = df['price'].values[1:]  # Predict next day's price
        X = X[:-1]  # Adjust features accordingly

        return X, y

    def train_price_prediction_model(self, asin):
        X, y = self.prepare_trend_data(asin)

        if X is None or len(X) < 20:
            print(f"Insufficient data for {asin}")
            return False

        # Data standardization
        X_scaled = self.scaler.fit_transform(X)

        # Train model
        model = LinearRegression()
        model.fit(X_scaled, y)

        # Calculate model performance
        train_score = model.score(X_scaled, y)
        print(f"Model R² score for {asin}: {train_score:.3f}")

        self.models[asin] = {
            'model': model,
            'scaler': self.scaler,
            'features': ['price_change_1d', 'price_change_7d', 'day_of_week', 
                        'day_of_month', 'ma_7', 'ma_14', 'volatility_7'],
            'score': train_score
        }

        return True

    def predict_future_price(self, asin, days_ahead=7):
        if asin not in self.models:
            print(f"No model trained for {asin}")
            return None

        model_info = self.models[asin]
        model = model_info['model']
        scaler = model_info['scaler']

        # Get the latest data point
        X_latest, _ = self.prepare_trend_data(asin)
        if X_latest is None:
            return None

        predictions = []
        current_features = X_latest[-1:].copy()

        for day in range(days_ahead):
            # Standardize features
            features_scaled = scaler.transform(current_features)

            # Predict price
            predicted_price = model.predict(features_scaled)[0]
            predictions.append(predicted_price)

            # Update features for the next prediction round
            # This is a simplified process; real applications require more complex feature update logic
            current_features[0, 0] = predicted_price - current_features[0, 0]  # Update price_change_1d

        return predictions

    def analyze_seasonal_patterns(self, category):
        conn = sqlite3.connect(self.db_path)

        query = '''
            SELECT p.asin, ph.price, ph.scraped_at
            FROM competitor_products p
            JOIN price_history ph ON p.asin = ph.asin
            WHERE p.category = ?
            ORDER BY ph.scraped_at
        '''

        df = pd.read_sql_query(query, conn, params=(category,))
        conn.close()

        if df.empty:
            return None

        df['scraped_at'] = pd.to_datetime(df['scraped_at'])
        df['month'] = df['scraped_at'].dt.month
        df['week_of_year'] = df['scraped_at'].dt.isocalendar().week

        # Analyze average price by month
        monthly_patterns = df.groupby('month')['price'].agg(['mean', 'std', 'count'])

        # Analyze price trends by week
        weekly_patterns = df.groupby('week_of_year')['price'].agg(['mean', 'std', 'count'])

        return {
            'monthly_patterns': monthly_patterns,
            'weekly_patterns': weekly_patterns,
            'overall_stats': {
                'avg_price': df['price'].mean(),
                'price_volatility': df['price'].std(),
                'data_points': len(df)
            }
        }

Inventory Monitoring and Restocking Alerts

For managing the inventory of your own products, real-time monitoring of competitors’ stock status can help in formulating better restocking strategies.

class InventoryMonitor:
    def __init__(self, api_client, notification_webhook=None):
        self.api_client = api_client
        self.webhook = notification_webhook
        self.inventory_history = {}

    def monitor_inventory_status(self, asin_list):
        inventory_alerts = []

        for asin in asin_list:
            try:
                # Get current inventory status
                product_data = self.api_client.scrape_product_details(asin)

                if product_data and product_data.get('code') == 0:
                    data = product_data['data']
                    current_status = self.analyze_inventory_status(data)

                    # Check for status changes
                    alert = self.check_inventory_change(asin, current_status)
                    if alert:
                        inventory_alerts.append(alert)

                    # Update history
                    self.inventory_history[asin] = current_status

            except Exception as e:
                print(f"Failed to monitor inventory for {asin}: {e}")

        # Send alert notifications
        if inventory_alerts:
            self.send_inventory_alerts(inventory_alerts)

        return inventory_alerts

    def analyze_inventory_status(self, product_data):
        status = {
            'timestamp': datetime.now(),
            'has_cart': product_data.get('has_cart', False),
            'delivery_time': product_data.get('deliveryTime'),
            'stock_level': 'unknown'
        }

        # Analyze stock level
        if not status['has_cart']:
            status['stock_level'] = 'out_of_stock'
        elif 'temporarily out of stock' in str(product_data.get('availability', '')).lower():
            status['stock_level'] = 'temporarily_unavailable'
        elif status['delivery_time']:
            delivery_text = status['delivery_time'].lower()
            if 'days' in delivery_text:
                # Extract number of days
                days_match = re.search(r'(\d+)', delivery_text)
                if days_match:
                    delivery_days = int(days_match.group(1))
                    if delivery_days > 7:
                        status['stock_level'] = 'low_stock'
                    elif delivery_days > 14:
                        status['stock_level'] = 'very_low_stock'
                    else:
                        status['stock_level'] = 'in_stock'
            elif 'tomorrow' in delivery_text or 'today' in delivery_text:
                status['stock_level'] = 'high_stock'

        return status

    def check_inventory_change(self, asin, current_status):
        if asin not in self.inventory_history:
            return None

        previous_status = self.inventory_history[asin]

        # Check for key status changes
        if previous_status['stock_level'] != current_status['stock_level']:
            return {
                'asin': asin,
                'alert_type': 'stock_level_change',
                'previous_level': previous_status['stock_level'],
                'current_level': current_status['stock_level'],
                'timestamp': current_status['timestamp']
            }

        # Check for changes in buy box status
        if previous_status['has_cart'] != current_status['has_cart']:
            return {
                'asin': asin,
                'alert_type': 'availability_change',
                'previous_available': previous_status['has_cart'],
                'current_available': current_status['has_cart'],
                'timestamp': current_status['timestamp']
            }

        return None

    def send_inventory_alerts(self, alerts):
        if not self.webhook:
            # Simple console output
            print("\n=== Inventory Alerts ===")
            for alert in alerts:
                print(f"Product {alert['asin']}:")
                print(f"  Alert Type: {alert['alert_type']}")
                if 'stock_level_change' in alert['alert_type']:
                    print(f"  Stock Level Change: {alert['previous_level']} → {alert['current_level']}")
                print(f"  Time: {alert['timestamp']}")
                print("---")
        else:
            # Send to webhook
            try:
                import requests
                response = requests.post(self.webhook, json={'alerts': alerts})
                print(f"Alerts sent to webhook, status: {response.status_code}")
            except Exception as e:
                print(f"Failed to send webhook notification: {e}")

Data Quality Assurance and Validation

When performing large-scale Amazon data scraping, ensuring data quality is crucial. Incorrect data can lead to wrong business decisions, so a complete data validation mechanism needs to be established.

Data Consistency Checks

class DataQualityValidator:
    def __init__(self):
        self.validation_rules = self.load_validation_rules()
        self.anomaly_thresholds = {
            'price_change_percent': 50,  # Price change over 50% needs validation
            'rating_change': 1.0,        # Rating change over 1.0 needs validation
            'review_count_change_percent': 200  # Review count change over 200% needs validation
        }

    def load_validation_rules(self):
        return {
            'price': {
                'min': 0.01,
                'max': 10000,
                'type': float
            },
            'rating': {
                'min': 1.0,
                'max': 5.0,
                'type': float
            },
            'review_count': {
                'min': 0,
                'max': 1000000,
                'type': int
            },
            'title': {
                'min_length': 5,
                'max_length': 500,
                'type': str
            }
        }

    def validate_product_data(self, product_data, asin):
        validation_results = {
            'is_valid': True,
            'warnings': [],
            'errors': []
        }

        # Basic field validation
        for field, rules in self.validation_rules.items():
            value = product_data.get(field)

            if value is None:
                validation_results['warnings'].append(f"Missing field: {field}")
                continue

            # Type check
            expected_type = rules['type']
            if not isinstance(value, expected_type):
                try:
                    value = expected_type(value)
                except (ValueError, TypeError):
                    validation_results['errors'].append(f"Invalid type for {field}: expected {expected_type.__name__}")
                    validation_results['is_valid'] = False
                    continue

            # Range check
            if 'min' in rules and value < rules['min']:
                validation_results['errors'].append(f"{field} value {value} below minimum {rules['min']}")
                validation_results['is_valid'] = False

            if 'max' in rules and value > rules['max']:
                validation_results['errors'].append(f"{field} value {value} above maximum {rules['max']}")
                validation_results['is_valid'] = False

            # Length check (string)
            if expected_type == str:
                if 'min_length' in rules and len(value) < rules['min_length']:
                    validation_results['errors'].append(f"{field} too short: {len(value)} characters")
                    validation_results['is_valid'] = False

                if 'max_length' in rules and len(value) > rules['max_length']:
                    validation_results['warnings'].append(f"{field} very long: {len(value)} characters")

        # Business logic validation
        self.validate_business_logic(product_data, validation_results)

        # Anomaly detection
        self.detect_anomalies(product_data, asin, validation_results)

        return validation_results

    def validate_business_logic(self, data, results):
        # Reasonableness check for rating and review count
        rating = data.get('rating')
        review_count = data.get('review_count')

        if rating and review_count:
            if review_count < 10 and rating > 4.8:
                results['warnings'].append("High rating with very few reviews - potential fake reviews")

            if review_count > 1000 and rating < 3.0:
                results['warnings'].append("Many reviews with very low rating - check data accuracy")

        # Price reasonableness check
        price = data.get('price')
        title = data.get('title', '').lower()

        if price and price < 1.0:
            if not any(keyword in title for keyword in ['ebook', 'kindle', 'digital']):
                results['warnings'].append("Very low price for physical product")

        # Image count check
        images = data.get('images', [])
        if len(images) < 3:
            results['warnings'].append("Few product images - may affect conversion")

    def detect_anomalies(self, current_data, asin, results):
        # Here you can compare with historical data to detect anomalies
        # To simplify the example, only basic checks are done here

        price = current_data.get('price')
        if price and price > 1000:
            results['warnings'].append("High price product - verify accuracy")

        review_count = current_data.get('review_count')
        if review_count and review_count > 10000:
            results['warnings'].append("Very high review count - verify data accuracy")

Data Cleaning and Standardization

class DataCleaner:
    def __init__(self):
        self.price_patterns = [
            r'\$?([\d,]+\.?\d*)',
            r'Price:\s*\$?([\d,]+\.?\d*)',
            r'(\d+\.?\d*)\s*dollars?'
        ]

        self.rating_patterns = [
            r'(\d\.?\d*)\s*out\s*of\s*5',
            r'(\d\.?\d*)\s*stars?',
            r'Rating:\s*(\d\.?\d*)'
        ]

    def clean_price_data(self, price_text):
        if not price_text:
            return None

        # Remove currency symbols and extra text
        price_text = str(price_text).strip()

        for pattern in self.price_patterns:
            match = re.search(pattern, price_text)
            if match:
                price_str = match.group(1).replace(',', '')
                try:
                    return float(price_str)
                except ValueError:
                    continue

        return None

    def clean_rating_data(self, rating_text):
        if not rating_text:
            return None

        rating_text = str(rating_text).strip()

        # Direct number
        try:
            rating = float(rating_text)
            if 1.0 <= rating <= 5.0:
                return rating
        except ValueError:
            pass

        # Pattern matching
        for pattern in self.rating_patterns:
            match = re.search(pattern, rating_text)
            if match:
                try:
                    rating = float(match.group(1))
                    if 1.0 <= rating <= 5.0:
                        return rating
                except ValueError:
                    continue

        return None

    def clean_review_count(self, review_text):
        if not review_text:
            return 0

        review_text = str(review_text).strip()

        # Extract numbers (supports comma-separated large numbers)
        numbers = re.findall(r'[\d,]+', review_text)
        if numbers:
            try:
                return int(numbers[0].replace(',', ''))
            except ValueError:
                pass

        return 0

    def standardize_product_data(self, raw_data):
        cleaned_data = {}

        # Clean basic fields
        cleaned_data['asin'] = raw_data.get('asin', '').strip()
        cleaned_data['title'] = self.clean_title(raw_data.get('title'))
        cleaned_data['price'] = self.clean_price_data(raw_data.get('price'))
        cleaned_data['rating'] = self.clean_rating_data(raw_data.get('rating'))
        cleaned_data['review_count'] = self.clean_review_count(raw_data.get('review_count'))

        # Clean image URLs
        images = raw_data.get('images', [])
        cleaned_data['images'] = self.clean_image_urls(images)

        # Clean category information
        cleaned_data['category'] = self.clean_category(raw_data.get('category'))

        # Clean description
        cleaned_data['description'] = self.clean_description(raw_data.get('description'))

        return cleaned_data

    def clean_title(self, title):
        if not title:
            return ""

        title = str(title).strip()

        # Remove extra spaces
        title = re.sub(r'\s+', ' ', title)

        # Remove special characters (preserving basic punctuation)
        title = re.sub(r'[^\w\s\-\(\)\[\]&,.]', '', title)

        return title[:200]  # Limit length

    def clean_image_urls(self, images):
        if not images:
            return []

        cleaned_images = []
        for img_url in images:
            if img_url and isinstance(img_url, str):
                # Validate URL format
                if img_url.startswith(('http://', 'https://')):
                    cleaned_images.append(img_url.strip())

        return cleaned_images

    def clean_category(self, category):
        if not category:
            return ""

        category = str(category).strip()

        # Standardize category names
        category_mapping = {
            'electronics': 'Electronics',
            'books': 'Books',
            'home & garden': 'Home & Garden',
            'toys & games': 'Toys & Games'
        }

        return category_mapping.get(category.lower(), category)

    def clean_description(self, description):
        if not description:
            return ""

        description = str(description).strip()

        # Remove HTML tags
        description = re.sub(r'<[^>]+>', '', description)

        # Remove extra newlines and spaces
        description = re.sub(r'\n+', '\n', description)
        description = re.sub(r'\s+', ' ', description)

        return description[:1000]  # Limit length

Performance Optimization and Scaling Strategies

Concurrent Processing and Performance Tuning

When dealing with a large amount of product data, a reasonable concurrent processing strategy can significantly improve efficiency. However, it should be noted that excessively high concurrency may trigger Amazon’s anti-scraping mechanisms.

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import queue
import threading

class HighPerformanceScraper:
    def __init__(self, max_concurrent=5, request_delay=2):
        self.max_concurrent = max_concurrent
        self.request_delay = request_delay
        self.session = None
        self.rate_limiter = asyncio.Semaphore(max_concurrent)

    async def init_session(self):
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
        timeout = aiohttp.ClientTimeout(total=30)
        self.session = aiohttp.ClientSession(
            connector=connector, 
            timeout=timeout,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )

    async def scrape_products_batch(self, asin_list):
        if not self.session:
            await self.init_session()

        tasks = []
        for asin in asin_list:
            task = asyncio.create_task(self.scrape_single_product(asin))
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results and exceptions
        processed_results = {}
        for asin, result in zip(asin_list, results):
            if isinstance(result, Exception):
                print(f"Failed to scrape {asin}: {result}")
                processed_results[asin] = None
            else:
                processed_results[asin] = result

        return processed_results

    async def scrape_single_product(self, asin):
        async with self.rate_limiter:
            url = f"https://www.amazon.com/dp/{asin}"

            try:
                await asyncio.sleep(self.request_delay)  # Control request frequency

                async with self.session.get(url) as response:
                    if response.status == 200:
                        html_content = await response.text()
                        # In a real application, you would parse the HTML here
                        return {"status": "success", "asin": asin} 
                    else:
                        print(f"HTTP {response.status} for {asin}")
                        return None

            except Exception as e:
                print(f"Request failed for {asin}: {e}")
                return None

    def parse_product_page(self, html_content):
        # Use previously defined parsing logic
        soup = BeautifulSoup(html_content, 'html.parser')

        # Simplified for this example, a real application should use complete parsing logic
        product_data = {
            'title': "Example Title", # self.extract_title(soup),
            'price': 19.99, # self.extract_price(soup),
            'rating': 4.5, # self.extract_rating(soup),
            # ... other fields
        }

        return product_data

    async def close_session(self):
        if self.session:
            await self.session.close()

# Usage example
async def run_batch_scraping():
    scraper = HighPerformanceScraper(max_concurrent=3, request_delay=1.5)

    asin_batch = [
        "B08N5WRWNW", "B07XJ8C8F5", "B079QHML21",
        "B07HZLHPKP", "B01E6AO69U", "B077SXQZJX"
    ]

    try:
        results = await scraper.scrape_products_batch(asin_batch)

        for asin, data in results.items():
            if data:
                print(f"✓ {asin}: Success")
            else:
                print(f"✗ {asin}: Failed to scrape")

    finally:
        await scraper.close_session()

# To run the async scraper:
# asyncio.run(run_batch_scraping())

Data Caching and Storage Optimization

A proper data caching strategy can not only improve response speed but also reduce unnecessary duplicate requests, lowering the risk of being banned.

import redis
import json
from datetime import datetime, timedelta
import hashlib

class DataCache:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_client = redis.Redis(
            host=redis_host, 
            port=redis_port, 
            db=redis_db,
            decode_responses=True
        )
        self.default_ttl = 3600  # 1-hour cache

    def generate_cache_key(self, asin, data_type='product'):
        """Generate cache key"""
        key_string = f"{data_type}:{asin}"
        return hashlib.md5(key_string.encode()).hexdigest()

    def cache_product_data(self, asin, data, ttl=None):
        """Cache product data"""
        if ttl is None:
            ttl = self.default_ttl

        cache_key = self.generate_cache_key(asin)
        cache_data = {
            'data': data,
            'cached_at': datetime.now().isoformat(),
            'asin': asin
        }

        try:
            self.redis_client.setex(
                cache_key, 
                ttl, 
                json.dumps(cache_data, default=str)
            )
            return True
        except Exception as e:
            print(f"Failed to cache data for {asin}: {e}")
            return False

    def get_cached_product_data(self, asin, max_age_hours=1):
        """Get cached product data"""
        cache_key = self.generate_cache_key(asin)

        try:
            cached_data = self.redis_client.get(cache_key)
            if not cached_data:
                return None

            cache_info = json.loads(cached_data)
            cached_at = datetime.fromisoformat(cache_info['cached_at'])

            # Check data age
            if datetime.now() - cached_at > timedelta(hours=max_age_hours):
                self.redis_client.delete(cache_key)
                return None

            return cache_info['data']

        except Exception as e:
            print(f"Failed to get cached data for {asin}: {e}")
            return None

    def invalidate_cache(self, asin):
        """Invalidate the cache"""
        cache_key = self.generate_cache_key(asin)
        self.redis_client.delete(cache_key)

    def get_cache_stats(self):
        """Get cache statistics"""
        try:
            info = self.redis_client.info()
            return {
                'used_memory': info.get('used_memory_human'),
                'keyspace_hits': info.get('keyspace_hits'),
                'keyspace_misses': info.get('keyspace_misses'),
                'connected_clients': info.get('connected_clients')
            }
        except Exception as e:
            print(f"Failed to get cache stats: {e}")
            return {}

class CachedAmazonScraper:
    def __init__(self, api_client, cache_client):
        self.api_client = api_client
        self.cache = cache_client

    def get_product_data(self, asin, force_refresh=False):
        """Get product data, prioritizing the cache"""
        if not force_refresh:
            cached_data = self.cache.get_cached_product_data(asin)
            if cached_data:
                print(f"Cache hit for {asin}")
                return cached_data

        # Cache miss, get data from the API
        print(f"Cache miss for {asin}, fetching from API")
        fresh_data = self.api_client.scrape_product_details(asin)

        if fresh_data and fresh_data.get('code') == 0:
            product_data = fresh_data['data']

            # Cache the new data
            self.cache.cache_product_data(asin, product_data)

            return product_data

        return None

    def batch_get_products(self, asin_list, force_refresh=False):
        """Batch get product data"""
        results = {}
        api_requests_needed = []

        # Check cache
        for asin in asin_list:
            if not force_refresh:
                cached_data = self.cache.get_cached_product_data(asin)
                if cached_data:
                    results[asin] = cached_data
                    continue

            api_requests_needed.append(asin)

        print(f"Cache hits: {len(results)}, API requests needed: {len(api_requests_needed)}")

        # Batch request uncached data
        for asin in api_requests_needed:
            fresh_data = self.get_product_data(asin, force_refresh=True)
            results[asin] = fresh_data

            # Control request frequency
            time.sleep(1)

        return results

Summary and Best Practice Recommendations

After in-depth technical analysis and practical case studies, we can see that Amazon data scraping is a complex technical challenge. Successful data scraping requires a balance in multiple aspects: the complexity of technical implementation, the assurance of data quality, considerations of compliance, and the trade-off of cost-effectiveness.

Summary of Technical Implementation Points

At the technical implementation level, the most critical points include: first, precise control of the request frequency, which is not just a simple time interval, but also requires considering the tolerance differences of different page types and dynamically adjusting the strategy based on the response status. Second is the importance of session management; proper cookie handling and session state maintenance can significantly improve the success rate and completeness of data acquisition.

In terms of data parsing, establishing a multi-level fault-tolerant mechanism is a must. Changes in Amazon’s page structure are the norm, and only by establishing sufficiently flexible and robust parsing rules can the system maintain stable operation in the face of these changes. At the same time, for handling dynamic content, it is necessary to choose the most suitable technical solution based on specific needs, finding the best balance between performance and functionality.

Data Quality Assurance System

Ensuring data quality requires establishing a complete validation and cleaning system. This includes multiple links such as real-time data validation, historical data comparison, and anomaly detection mechanisms. Especially in large-scale data scraping scenarios, automated data quality monitoring becomes particularly important. Only by ensuring the accuracy and consistency of the data can subsequent analysis and decision-making be meaningful.

Value Proposition of Professional Services

For most businesses, the technical investment and maintenance costs required to build a complete Amazon data scraping system are enormous. Professional API services can not only provide stable and reliable data acquisition capabilities but, more importantly, can allow businesses to focus their limited technical resources on core business logic.

Taking the Pangolin Scrape API as an example, its technical advantages in areas like Sponsored ad slot identification, “Customer Says” data acquisition, and multi-region data scraping are difficult for an ordinary in-house team to achieve in a short period. More importantly, professional services can continuously keep up with various changes on the Amazon platform, ensuring the stability and accuracy of data scraping.

Future Development Trends

As Amazon’s anti-scraping technology continues to upgrade, the field of data scraping is also moving towards a more intelligent and professional direction. Machine learning technology will play an increasingly important role in aspects such as page structure recognition, anomaly detection, and data cleaning. At the same time, cloud-native data scraping architecture will also become mainstream, providing better scalability and stability.

For e-commerce practitioners, data-driven operational strategies have become a key factor in competition. Whether choosing to build a system in-house or use a professional service, it is necessary to establish a complete system for data scraping, processing, and analysis. Only in this way can one maintain a competitive advantage in the fierce market.

Amazon data scraping is not just a technical problem, but a systems engineering project. It requires the perfect combination of technical capabilities, business understanding, and compliance awareness. I hope this article can provide you with valuable references and guidance in your exploration of this field. Remember, the value of data lies not in the difficulty of obtaining it, but in how to transform it into actionable business insights. In this data-driven era, mastering the correct data acquisition methods means mastering the initiative in the competition.

# Final complete example: Building a production-level data scraping system
class ProductionAmazonScraper:
    def __init__(self, config):
        self.config = config
        self.api_client = PangolinAPIClient(config['api_key'])
        self.cache = DataCache(**config['redis_config'])
        self.validator = DataQualityValidator()
        self.cleaner = DataCleaner()

        # Initialize database connection
        self.db = sqlite3.connect(config['db_path'])
        self.init_database()

    def init_database(self):
        """Initialize the database table structure"""
        cursor = self.db.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS scraped_products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                asin TEXT UNIQUE NOT NULL,
                title TEXT,
                price REAL,
                rating REAL,
                review_count INTEGER,
                raw_data TEXT,
                validation_status TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        self.db.commit()

    def scrape_and_store_product(self, asin):
        """Complete product data scraping and storage process"""
        try:
            # 1. Try to get from cache
            cached_data = self.cache.get_cached_product_data(asin)
            if cached_data:
                return self.process_and_store_data(asin, cached_data, from_cache=True)

            # 2. Get raw data from API
            raw_response = self.api_client.scrape_product_details(asin)
            if not raw_response or raw_response.get('code') != 0:
                return {'success': False, 'error': 'API request failed'}

            raw_data = raw_response['data']

            # 3. Data validation
            validation_result = self.validator.validate_product_data(raw_data, asin)

            # 4. Data cleaning
            cleaned_data = self.cleaner.standardize_product_data(raw_data)

            # 5. Cache the cleaned data
            self.cache.cache_product_data(asin, cleaned_data)

            # 6. Store in the database
            return self.store_to_database(asin, cleaned_data, validation_result)

        except Exception as e:
            return {'success': False, 'error': f'Unexpected error: {str(e)}'}

    def process_and_store_data(self, asin, data, from_cache=False):
        """Process and store data"""
        try:
            # Validate cached data
            validation_result = self.validator.validate_product_data(data, asin)

            # Store in the database
            return self.store_to_database(asin, data, validation_result, from_cache)

        except Exception as e:
            return {'success': False, 'error': str(e)}

    def store_to_database(self, asin, data, validation_result, from_cache=False):
        """Store data in the database"""
        cursor = self.db.cursor()

        try:
            cursor.execute('''
                INSERT OR REPLACE INTO scraped_products 
                (asin, title, price, rating, review_count, raw_data, validation_status, updated_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
            ''', (
                asin,
                data.get('title'),
                data.get('price'),
                data.get('rating'),
                data.get('review_count'),
                json.dumps(data),
                'valid' if validation_result['is_valid'] else 'invalid'
            ))

            self.db.commit()

            return {
                'success': True,
                'asin': asin,
                'from_cache': from_cache,
                'validation_warnings': len(validation_result['warnings']),
                'validation_errors': len(validation_result['errors'])
            }

        except Exception as e:
            self.db.rollback()
            return {'success': False, 'error': f'Database error: {str(e)}'}

    def batch_scrape_products(self, asin_list, max_concurrent=3):
        """Batch scrape product data"""
        results = {}
        success_count = 0

        print(f"Starting batch scraping for {len(asin_list)} products...")

        for i, asin in enumerate(asin_list):
            print(f"Processing [{i+1}/{len(asin_list)}]: {asin}")

            result = self.scrape_and_store_product(asin)
            results[asin] = result

            if result['success']:
                success_count += 1
                status = "✓ Success"
                if result.get('from_cache'):
                    status += " (Cache)"
                if result.get('validation_warnings', 0) > 0:
                    status += f" ({result['validation_warnings']} warnings)"
            else:
                status = f"✗ Failure: {result.get('error', 'Unknown error')}"

            print(f"  {status}")

            # Control request frequency
            time.sleep(random.uniform(1.0, 2.0))

        print(f"\nBatch scraping complete: {success_count}/{len(asin_list)} successful")

        return results

    def get_scraping_report(self):
        """Generate a scraping report"""
        cursor = self.db.cursor()

        # Get statistics
        cursor.execute('''
            SELECT 
                COUNT(*) as total_products,
                COUNT(CASE WHEN validation_status = 'valid' THEN 1 END) as valid_products,
                AVG(price) as avg_price,
                AVG(rating) as avg_rating,
                MAX(updated_at) as last_update
            FROM scraped_products
        ''')

        stats = cursor.fetchone()

        # Get cache stats
        cache_stats = self.cache.get_cache_stats()

        return {
            'database_stats': {
                'total_products': stats[0],
                'valid_products': stats[1],
                'avg_price': round(stats[2] or 0, 2),
                'avg_rating': round(stats[3] or 0, 2),
                'last_update': stats[4]
            },
            'cache_stats': cache_stats
        }

# Usage example
def main():
    config = {
        'api_key': 'your_pangolin_api_key',
        'redis_config': {
            'redis_host': 'localhost',
            'redis_port': 6379,
            'redis_db': 0
        },
        'db_path': 'amazon_products.db'
    }

    scraper = ProductionAmazonScraper(config)

    # Example ASIN list
    test_asins = [
        "B08N5WRWNW",  # Echo Dot
        "B07XJ8C8F5",  # Fire TV Stick  
        "B079QHML21",  # Echo Show
        "B01E6AO69U",  # Kindle Paperwhite
        "B077SXQZJX"   # Echo Plus
    ]

    # Execute batch scraping
    results = scraper.batch_scrape_products(test_asins)

    # Generate report
    report = scraper.get_scraping_report()

    print("\n=== Scraping Report ===")
    print(f"Database Stats:")
    print(f"  Total Products: {report['database_stats']['total_products']}")
    print(f"  Valid Products: {report['database_stats']['valid_products']}")
    print(f"  Average Price: ${report['database_stats']['avg_price']}")
    print(f"  Average Rating: {report['database_stats']['avg_rating']}")

    if report['cache_stats']:
        print(f"Cache Stats:")
        print(f"  Memory Used: {report['cache_stats'].get('used_memory', 'N/A')}")
        print(f"  Cache Hits: {report['cache_stats'].get('keyspace_hits', 0)}")

if __name__ == "__main__":
    # main()
    pass

Through this complete implementation example, we can see the various aspects that need to be considered in a production-level Amazon data scraping system. From data acquisition, validation, cleaning, to caching, storage, and monitoring, every link needs to be carefully designed and implemented.
Such a system can not only stably and reliably acquire Amazon data but also maintain robustness in the face of various abnormal situations. More importantly, through a reasonable architectural design, the entire system has good scalability and can be flexibly adjusted and optimized according to business needs.
Remember, data scraping is just the first step; how to transform this data into valuable business insights is the ultimate goal. I hope this guide will help you go further and more steadily on the path of Amazon data scraping.

Our solution

Protect your web crawler against blocked requests, proxy failure, IP leak, browser crash and CAPTCHAs!

With Data Pilot, easily access cross-page, endto-end data, solving data fragmentation andcomplexity, empowering quick, informedbusiness decisions.

Weekly Tutorial

Sign up for our Newsletter

Sign up now to embark on your Amazon data journey, and we will provide you with the most accurate and efficient data collection solutions.

Quick Test

Scan to chat on WhatsApp

WhatsApp QR code
Scroll to Top

Unlock website data now!

Submit request → Get a custom solution + Free API test.

We use TLS/SSL encryption, and your submitted information is only used for solution communication.

联系我们,您的问题,我们随时倾听

无论您在使用 Pangolin 产品的过程中遇到任何问题,或有任何需求与建议,我们都在这里为您提供支持。请填写以下信息,我们的团队将尽快与您联系,确保您获得最佳的产品体验。

Talk to our team

If you encounter any issues while using Pangolin products, please fill out the following information, and our team will contact you as soon as possible to ensure you have the best product experience.