Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 184 additions & 26 deletions percy/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import platform
import json
from contextlib import contextmanager
from functools import lru_cache
from time import sleep
from urllib.parse import urlparse, urljoin
import requests

from selenium.webdriver import __version__ as SELENIUM_VERSION
Expand All @@ -15,11 +17,15 @@
CLIENT_INFO = 'percy-selenium-python/' + SDK_VERSION
ENV_INFO = ['selenium/' + SELENIUM_VERSION, 'python/' + platform.python_version()]

def _get_bool_env(key):
return os.environ.get(key, "").lower() == "true"

# Maybe get the CLI API address from the environment
PERCY_CLI_API = os.environ.get('PERCY_CLI_API') or 'http://localhost:5338'
PERCY_DEBUG = os.environ.get('PERCY_LOGLEVEL') == 'debug'
RESONSIVE_CAPTURE_SLEEP_TIME = os.environ.get('RESONSIVE_CAPTURE_SLEEP_TIME')

RESPONSIVE_CAPTURE_SLEEP_TIME = os.environ.get('RESPONSIVE_CAPTURE_SLEEP_TIME')
PERCY_RESPONSIVE_CAPTURE_MIN_HEIGHT = _get_bool_env("PERCY_RESPONSIVE_CAPTURE_MIN_HEIGHT")
PERCY_RESPONSIVE_CAPTURE_RELOAD_PAGE = _get_bool_env("PERCY_RESPONSIVE_CAPTURE_RELOAD_PAGE")
# for logging
LABEL = '[\u001b[35m' + ('percy:python' if PERCY_DEBUG else 'percy') + '\u001b[39m]'
CDP_SUPPORT_SELENIUM = (str(SELENIUM_VERSION)[0].isdigit() and int(
Expand Down Expand Up @@ -131,23 +137,125 @@ def create_region(

return region

def get_serialized_dom(driver, cookies, **kwargs):
@contextmanager
def iframe_context(driver, frame_element):
"""Safely switches to an iframe and always switches back to the parent."""
driver.switch_to.frame(frame_element)
try:
yield
finally:
driver.switch_to.parent_frame()

def process_frame(driver, frame_element, options, percy_dom_script):
"""Processes a single cross-origin frame to capture its snapshot."""
frame_url = frame_element.get_attribute('src') or "unknown-src"
with iframe_context(driver, frame_element):
try:
# Inject Percy DOM into the cross-origin frame context
driver.execute_script(percy_dom_script)
# Serialize inside the frame.
# enableJavaScript=True is required to handle CORS iframes manually.
iframe_options = {**options, 'enableJavaScript': True}
iframe_snapshot = driver.execute_script(
f"return PercyDOM.serialize({json.dumps(iframe_options)})"
)
except Exception as e:
log(f"Failed to process cross-origin frame {frame_url}: {e}", "debug")
return None
# Back in parent context: find the percyElementId created by the main page serialization
percy_element_id = frame_element.get_attribute('data-percy-element-id')
if not percy_element_id:
log(f"Skipping frame {frame_url}: no matching percyElementId found", "debug")
return None
return {
"iframeData": {"percyElementId": percy_element_id},
"iframeSnapshot": iframe_snapshot,
"frameUrl": frame_url
}


def _is_unsupported_iframe_src(frame_src):
return (
not frame_src or
frame_src == "about:blank" or
frame_src.startswith("javascript:") or
frame_src.startswith("data:") or
frame_src.startswith("vbscript:")
)


def _get_origin(url):
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"

def get_serialized_dom(driver, cookies, percy_dom_script=None, **kwargs):
# 1. Serialize the main page first (this adds the data-percy-element-ids)
dom_snapshot = driver.execute_script(f'return PercyDOM.serialize({json.dumps(kwargs)})')
# 2. Process CORS IFrames
try:
page_origin = _get_origin(driver.current_url)
iframes = driver.find_elements("tag name", "iframe")
if iframes and percy_dom_script:
processed_frames = []
for frame in iframes:
frame_src = frame.get_attribute('src')
if _is_unsupported_iframe_src(frame_src):
continue

try:
frame_origin = _get_origin(urljoin(driver.current_url, frame_src))
except Exception as e:
log(f"Skipping iframe \"{frame_src}\": {e}", "debug")
continue

if frame_origin == page_origin:
continue

result = process_frame(driver, frame, kwargs, percy_dom_script)
if result:
processed_frames.append(result)

if processed_frames:
dom_snapshot['corsIframes'] = processed_frames
except Exception as e:
log(f"Failed to process cross-origin iframes: {e}", "debug")

dom_snapshot['cookies'] = cookies
return dom_snapshot

def get_widths_for_multi_dom(eligible_widths, **kwargs):
user_passed_widths = kwargs.get('widths', [])
width = kwargs.get('width')
if width: user_passed_widths = [width]

# Deep copy mobile widths otherwise it will get overridden
allWidths = eligible_widths.get('mobile', [])[:]
if len(user_passed_widths) != 0:
allWidths.extend(user_passed_widths)
else:
allWidths.extend(eligible_widths.get('config', []))
return list(set(allWidths))
def get_responsive_widths(widths=None):
if widths is None:
widths = []
try:
widths_list = widths if isinstance(widths, list) else []
query_param = f"?widths={','.join(map(str, widths_list))}" if widths_list else ""
response = requests.get(
f"{PERCY_CLI_API}/percy/widths-config{query_param}",
timeout=30
)
response.raise_for_status()
data = response.json()
widths_data = data.get("widths")
if not isinstance(widths_data, list):
msg = "Update Percy CLI to the latest version to use responsiveSnapshotCapture"
raise Exception(msg)
return widths_data
except Exception as e:
log(f"Failed to get responsive widths: {e}.", "debug")
msg = "Update Percy CLI to the latest version to use responsiveSnapshotCapture"
raise Exception(msg) from e

def _setup_resize_listener(driver):
"""Initializes the resize counter and attaches a named listener to avoid duplicates."""
driver.execute_script("""
const handler = window._percyResizeHandler;
if (handler) {
window.removeEventListener('resize', handler);
}
window._percyResizeHandler = () => { window.resizeCount++; };
window.resizeCount = 0;
window.addEventListener('resize', window._percyResizeHandler);
""")

def change_window_dimension_and_wait(driver, width, height, resizeCount):
try:
Expand All @@ -167,24 +275,66 @@ def change_window_dimension_and_wait(driver, width, height, resizeCount):
except TimeoutException:
log(f"Timed out waiting for window resize event for width {width}", 'debug')


def capture_responsive_dom(driver, eligible_widths, cookies, **kwargs):
widths = get_widths_for_multi_dom(eligible_widths, **kwargs)
def _responsive_sleep():
if not RESPONSIVE_CAPTURE_SLEEP_TIME:
return
try:
secs = int(RESPONSIVE_CAPTURE_SLEEP_TIME)
if secs > 0:
sleep(secs)
except (TypeError, ValueError):
pass

def capture_responsive_dom(driver, cookies, config, percy_dom_script=None, **kwargs):
widths = get_responsive_widths(kwargs.get('widths'))
log(widths, 'debug')
dom_snapshots = []
window_size = driver.get_window_size()
current_width, current_height = window_size['width'], window_size['height']
last_window_width = current_width
resize_count = 0
# Initialize resize listener once before the loop
_setup_resize_listener(driver)
driver.execute_script("PercyDOM.waitForResize()")

for width in widths:
target_height = current_height

if PERCY_RESPONSIVE_CAPTURE_MIN_HEIGHT:
min_height = kwargs.get('minHeight') or config.get('snapshot', {}).get('minHeight')
if min_height:
try:
min_height = int(min_height)
except (TypeError, ValueError):
log(
f'Invalid minHeight value {min_height!r}; expected integer, '
'using current window height instead.',
'debug',
)
else:
target_height = driver.execute_script(
f"return window.outerHeight - window.innerHeight + {min_height}")
log(
f'Calculated height for responsive capture using minHeight: {target_height}',
'debug')

for width_dict in widths:
width = width_dict['width']
height = width_dict.get('height', target_height)
if last_window_width != width:
resize_count += 1
change_window_dimension_and_wait(driver, width, current_height, resize_count)
change_window_dimension_and_wait(driver, width, height, resize_count)
last_window_width = width

if RESONSIVE_CAPTURE_SLEEP_TIME: sleep(int(RESONSIVE_CAPTURE_SLEEP_TIME))
dom_snapshot = get_serialized_dom(driver, cookies, **kwargs)
if PERCY_RESPONSIVE_CAPTURE_RELOAD_PAGE:
log(f'Reloading page for width: {width}', 'debug')
driver.refresh()
driver.execute_script(percy_dom_script)
_setup_resize_listener(driver)
driver.execute_script("PercyDOM.waitForResize();")
resize_count = 0 # Reset count because the listener just started fresh

_responsive_sleep()
dom_snapshot = get_serialized_dom(
driver, cookies, percy_dom_script=percy_dom_script, **kwargs)
dom_snapshot['width'] = width
dom_snapshots.append(dom_snapshot)

Expand All @@ -211,14 +361,22 @@ def percy_snapshot(driver, name, **kwargs):

try:
# Inject the DOM serialization script
driver.execute_script(fetch_percy_dom())
percy_dom_script = fetch_percy_dom()
driver.execute_script(percy_dom_script)
cookies = driver.get_cookies()

# Serialize and capture the DOM
if is_responsive_snapshot_capture(data['config'], **kwargs):
dom_snapshot = capture_responsive_dom(driver, data['widths'], cookies, **kwargs)
dom_snapshot = capture_responsive_dom(
driver=driver,
cookies=cookies,
config=data['config'],
percy_dom_script=percy_dom_script,
**kwargs,
)
else:
dom_snapshot = get_serialized_dom(driver, cookies, **kwargs)
dom_snapshot = get_serialized_dom(
driver, cookies, percy_dom_script=percy_dom_script, **kwargs)

# Post the DOM to the snapshot endpoint with snapshot options and other info
response = requests.post(f'{PERCY_CLI_API}/percy/snapshot', json={**kwargs, **{
Expand Down
2 changes: 1 addition & 1 deletion percy/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.1.3-beta.0"
__version__ = "2.1.4-alpha.0"
6 changes: 3 additions & 3 deletions tests/test_driver_metadata.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# pylint: disable=[abstract-class-instantiated, arguments-differ]
import unittest
from unittest.mock import patch
from selenium.webdriver import Remote
from selenium.webdriver.remote.webdriver import WebDriver

from percy.driver_metadata import DriverMetaData

class TestDriverMetadata(unittest.TestCase):
@patch('selenium.webdriver.Remote')
@patch('selenium.webdriver.remote.webdriver.WebDriver')
@patch('percy.cache.Cache.CACHE', {})
def setUp(self, mock_webdriver) -> None:
mock_webdriver.__class__ = Remote
mock_webdriver.__class__ = WebDriver # pylint: disable=invalid-class-object
self.mock_webdriver = mock_webdriver
self.mock_webdriver.session_id = 'session_id_123'
self.mock_webdriver.command_executor._url = 'https://example-hub:4444/wd/hub' # pylint: disable=W0212
Expand Down
Loading
Loading