# Python 3.12 # It is very specific for the structure of my website. # It will not work for other websites, but I wanted to practice web scraping with Playwright and BeautifulSoup. # The next step is using the code of Mathur et al. (2019) but some modifications need to be done. import asyncio from playwright.async_api import async_playwright from bs4 import BeautifulSoup import re import csv import os import aiohttp from urllib.parse import urljoin import pdb SHOP_URLS = [ "https://kevin-maurin.com/shop/shop1.php", "https://kevin-maurin.com/shop/shop2.php", "https://kevin-maurin.com/shop/shop3.php" ] IMAGE_DIR = "product_images" os.makedirs(IMAGE_DIR, exist_ok=True) async def download_image(session, url, filename): try: async with session.get(url) as resp: if resp.status == 200: with open(filename, "wb") as f: f.write(await resp.read()) print(f"Downloaded image: {filename}") return True else: print(f"Failed to download {url}: HTTP {resp.status}") except Exception as e: print(f"Failed to download {url}: {e}") return False def extract_text_blocks(card): blocks = [] for child in card.find_all(recursive=False): if getattr(child, 'get_text', None): txt = child.get_text(" ", strip=True) if txt: blocks.append(txt) return blocks async def extract_products(page, url, session): await page.goto(url) await page.wait_for_load_state('networkidle') html = await page.content() soup = BeautifulSoup(html, "html.parser") products = [] product_cards = soup.find_all(class_=re.compile(r'\bproduct\b')) for idx, card in enumerate(product_cards): title = card.find('h3').get_text(strip=True) if card.find('h3') else "" img = card.find('img')['src'] if card.find('img') else "" img_url = urljoin(url, img) if img else "" img_filename = f"{IMAGE_DIR}/shop{SHOP_URLS.index(url)+1}_prod{idx+1}_{os.path.basename(img)}" if img else "" if img_url and img_filename: await download_image(session, img_url, img_filename) text_blocks = extract_text_blocks(card) products.append({ "title": title, "img": img, "img_local": img_filename, "text_blocks": text_blocks }) return products async def main(): results = [] async with async_playwright() as p, aiohttp.ClientSession() as session: browser = await p.chromium.launch(headless=True) context = await browser.new_context( viewport={"width": 1400, "height": 1000} ) page = await context.new_page() for url in SHOP_URLS: print(f"Scraping {url} ...") products = await extract_products(page, url, session) results.append({ "url": url, "products": products }) await browser.close() max_blocks = 0 for shop in results: for prod in shop["products"]: max_blocks = max(max_blocks, len(prod["text_blocks"])) with open("products_all_texts.csv", "w", newline='', encoding="utf-8") as csvfile: writer = csv.writer(csvfile) header = ["shop_url", "product_title", "image_src", "image_local"] + [f"text_{i+1}" for i in range(max_blocks)] writer.writerow(header) for shop in results: for prod in shop["products"]: row = [ shop["url"], prod["title"], prod["img"], prod["img_local"] ] + prod["text_blocks"] + [""] * (max_blocks - len(prod["text_blocks"])) writer.writerow(row) print("CSV written: products_all_texts.csv") if __name__ == "__main__": asyncio.run(main()) # for shop in results: # print(f"\n=== {shop['url']} ===") # for prod in shop["products"]: # print(f"\nProduct: {prod['title']}") # for dp in prod["dark_patterns"]: # cats = ", ".join(dp["categories"]) if dp["categories"] else "uncategorized" # print(f" - [{cats}] '{dp['text']}'") # if dp["relative_size"] is not None: # print(f" Relative size: {dp['relative_size']:.2%}") # if dp["bbox"]: # print(f" Position: left={dp['bbox']['x']:.0f}, top={dp['bbox']['y']:.0f}, width={dp['bbox']['width']:.0f}, height={dp['bbox']['height']:.0f}") # if dp["style"]: # print(f" Style: {dict(dp['style'])}") # # if __name__ == "__main__": # asyncio.run(main())