Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ asyncio_mode = "auto"
[tool.mypy]
plugins = []
ignore_missing_imports = true
strict = false
strict = true
follow_imports = "silent"
strict_optional = false
disable_error_code = ["empty-body"]
exclude = ["doc/code/", "pyrit/auxiliary_attacks/"]

Expand Down
5 changes: 2 additions & 3 deletions pyrit/analytics/result_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats
raise TypeError(f"Expected AttackResult, got {type(attack).__name__}: {attack!r}")

outcome = attack.outcome
attack_type = (
attack.get_attack_strategy_identifier().class_name if attack.get_attack_strategy_identifier() else "unknown"
)
_strategy_id = attack.get_attack_strategy_identifier()
attack_type = _strategy_id.class_name if _strategy_id is not None else "unknown"

if outcome == AttackOutcome.SUCCESS:
overall_counts["successes"] += 1
Expand Down
4 changes: 2 additions & 2 deletions pyrit/auth/azure_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def get_access_token_from_interactive_login(scope: str) -> str:
"""
try:
token_provider = get_bearer_token_provider(InteractiveBrowserCredential(), scope)
return token_provider()
return str(token_provider())
except Exception as e:
logger.error(f"Failed to obtain token for '{scope}': {e}")
raise
Expand All @@ -320,7 +320,7 @@ def get_azure_token_provider(scope: str) -> Callable[[], str]:
>>> token = token_provider() # Get current token
"""
try:
return get_bearer_token_provider(DefaultAzureCredential(), scope)
return get_bearer_token_provider(DefaultAzureCredential(), scope) # type: ignore[no-any-return]
except Exception as e:
logger.error(f"Failed to obtain token provider for '{scope}': {e}")
raise
Expand Down
6 changes: 5 additions & 1 deletion pyrit/auth/copilot_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,15 @@ async def response_handler(response: Any) -> None:

logger.info("Waiting for email input...")
await page.wait_for_selector("#i0116", timeout=self._elements_timeout)
if self._username is None:
raise ValueError("Username is not set")
await page.fill("#i0116", self._username)
await page.click("#idSIButton9")

logger.info("Waiting for password input...")
await page.wait_for_selector("#i0118", timeout=self._elements_timeout)
if self._password is None:
raise ValueError("Password is not set")
await page.fill("#i0118", self._password)
await page.click("#idSIButton9")

Expand Down Expand Up @@ -450,7 +454,7 @@ async def response_handler(response: Any) -> None:
else:
logger.error(f"Failed to retrieve bearer token within {self._token_capture_timeout} seconds.")

return bearer_token # type: ignore[no-any-return]
return bearer_token
except Exception as e:
logger.error("Failed to retrieve access token using Playwright.")

Expand Down
2 changes: 1 addition & 1 deletion pyrit/backend/routes/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def serve_media_async(
# Determine allowed directory from memory results_path
try:
memory = CentralMemory.get_memory_instance()
allowed_root = Path(memory.results_path).resolve()
allowed_root = Path(memory.results_path or "").resolve()
except Exception as exc:
raise HTTPException(status_code=500, detail="Memory not initialized; cannot determine results path.") from exc

Expand Down
2 changes: 1 addition & 1 deletion pyrit/backend/routes/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def get_version_async(request: Request) -> VersionResponse:
memory = CentralMemory.get_memory_instance()
db_type = type(memory).__name__
db_name = None
if memory.engine.url.database:
if memory.engine is not None and memory.engine.url.database:
db_name = memory.engine.url.database.split("?")[0]
database_info = f"{db_type} ({db_name})" if db_name else f"{db_type} (None)"
except Exception as e:
Expand Down
8 changes: 4 additions & 4 deletions pyrit/cli/_banner.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,11 +566,11 @@ def _render_line_with_segments(
result: list[str] = []
current_role: Optional[ColorRole] = None
for pos, ch in enumerate(line):
role = char_roles[pos]
if role != current_role:
color = _get_color(role, theme) if role else reset
char_role = char_roles[pos]
if char_role != current_role:
color = _get_color(char_role, theme) if char_role else reset
result.append(color)
current_role = role
current_role = char_role
result.append(ch)
result.append(reset)
return "".join(result)
Expand Down
8 changes: 5 additions & 3 deletions pyrit/cli/frontend_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class termcolor: # type: ignore[no-redef] # noqa: N801
"""Dummy termcolor fallback for colored printing if termcolor is not installed."""

@staticmethod
def cprint(text: str, color: str = None, attrs: list = None) -> None: # type: ignore[type-arg]
def cprint(text: str, color: Optional[str] = None, attrs: Optional[list[Any]] = None) -> None:
"""Print text without color."""
print(text)

Expand Down Expand Up @@ -187,7 +187,8 @@ def scenario_registry(self) -> ScenarioRegistry:
raise RuntimeError(
"FrontendCore not initialized. Call 'await context.initialize_async()' before accessing registries."
)
assert self._scenario_registry is not None
if self._scenario_registry is None:
raise ValueError("self._scenario_registry is not initialized")
return self._scenario_registry

@property
Expand All @@ -202,7 +203,8 @@ def initializer_registry(self) -> InitializerRegistry:
raise RuntimeError(
"FrontendCore not initialized. Call 'await context.initialize_async()' before accessing registries."
)
assert self._initializer_registry is not None
if self._initializer_registry is None:
raise ValueError("self._initializer_registry is not initialized")
return self._initializer_registry


Expand Down
2 changes: 1 addition & 1 deletion pyrit/common/data_url_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def convert_local_image_to_data_url(image_path: str) -> str:
str: A string containing the MIME type and the base64-encoded data of the image, formatted as a data URL.
"""
ext = DataTypeSerializer.get_extension(image_path)
if ext.lower() not in AZURE_OPENAI_GPT4O_SUPPORTED_IMAGE_FORMATS:
if not ext or ext.lower() not in AZURE_OPENAI_GPT4O_SUPPORTED_IMAGE_FORMATS:
raise ValueError(
f"Unsupported image format: {ext}. Supported formats are: {AZURE_OPENAI_GPT4O_SUPPORTED_IMAGE_FORMATS}"
)
Expand Down
106 changes: 54 additions & 52 deletions pyrit/common/display_response.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,54 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import io
import logging

from PIL import Image

from pyrit.common.notebook_utils import is_in_ipython_session
from pyrit.models import AzureBlobStorageIO, DiskStorageIO, MessagePiece

logger = logging.getLogger(__name__)


async def display_image_response(response_piece: MessagePiece) -> None:
"""
Display response images if running in notebook environment.

Args:
response_piece (MessagePiece): The response piece to display.
"""
from pyrit.memory import CentralMemory

memory = CentralMemory.get_memory_instance()
if (
response_piece.response_error == "none"
and response_piece.converted_value_data_type == "image_path"
and is_in_ipython_session()
):
image_location = response_piece.converted_value

try:
image_bytes = await memory.results_storage_io.read_file(image_location)
except Exception as e:
if isinstance(memory.results_storage_io, AzureBlobStorageIO):
try:
# Fallback to reading from disk if the storage IO fails
image_bytes = await DiskStorageIO().read_file(image_location)
except Exception as exc:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(exc)}")
return
else:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(e)}")
return

image_stream = io.BytesIO(image_bytes)
image = Image.open(image_stream)

# Jupyter built-in display function only works in notebooks.
display(image) # type: ignore[name-defined] # noqa: F821
if response_piece.response_error == "blocked":
logger.info("---\nContent blocked, cannot show a response.\n---")
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import io
import logging

from PIL import Image

from pyrit.common.notebook_utils import is_in_ipython_session
from pyrit.models import AzureBlobStorageIO, DiskStorageIO, MessagePiece

logger = logging.getLogger(__name__)


async def display_image_response(response_piece: MessagePiece) -> None:
"""
Display response images if running in notebook environment.

Args:
response_piece (MessagePiece): The response piece to display.
"""
from pyrit.memory import CentralMemory

memory = CentralMemory.get_memory_instance()
if (
response_piece.response_error == "none"
and response_piece.converted_value_data_type == "image_path"
and is_in_ipython_session()
):
image_location = response_piece.converted_value

try:
if memory.results_storage_io is None:
raise RuntimeError("Storage IO not initialized")
image_bytes = await memory.results_storage_io.read_file(image_location)
except Exception as e:
if isinstance(memory.results_storage_io, AzureBlobStorageIO):
try:
# Fallback to reading from disk if the storage IO fails
image_bytes = await DiskStorageIO().read_file(image_location)
except Exception as exc:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(exc)}")
return
else:
logger.error(f"Failed to read image from {image_location}. Full exception: {str(e)}")
return

image_stream = io.BytesIO(image_bytes)
image = Image.open(image_stream)

# Jupyter built-in display function only works in notebooks.
display(image) # type: ignore[name-defined] # noqa: F821
if response_piece.response_error == "blocked":
logger.info("---\nContent blocked, cannot show a response.\n---")
16 changes: 8 additions & 8 deletions pyrit/common/net_utility.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from typing import Any, Literal, Optional, overload
from typing import Any, Literal, Optional, cast, overload
from urllib.parse import parse_qs, urlparse, urlunparse

import httpx
Expand All @@ -10,18 +10,18 @@

Comment on lines 4 to 10
@overload
def get_httpx_client(
use_async: Literal[True], debug: bool = False, **httpx_client_kwargs: Optional[Any]
use_async: Literal[True], debug: bool = False, **httpx_client_kwargs: Any
) -> httpx.AsyncClient: ...


@overload
def get_httpx_client(
use_async: Literal[False] = False, debug: bool = False, **httpx_client_kwargs: Optional[Any]
use_async: Literal[False] = False, debug: bool = False, **httpx_client_kwargs: Any
) -> httpx.Client: ...


def get_httpx_client(
use_async: bool = False, debug: bool = False, **httpx_client_kwargs: Optional[Any]
use_async: bool = False, debug: bool = False, **httpx_client_kwargs: Any
) -> httpx.Client | httpx.AsyncClient:
"""
Get the httpx client for making requests.
Expand All @@ -32,10 +32,10 @@ def get_httpx_client(
client_class = httpx.AsyncClient if use_async else httpx.Client
proxy = "http://localhost:8080" if debug else None

proxy = httpx_client_kwargs.pop("proxy", proxy)
verify_certs = httpx_client_kwargs.pop("verify", not debug)
proxy = cast(Optional[str], httpx_client_kwargs.pop("proxy", proxy))
verify_certs = cast(bool, httpx_client_kwargs.pop("verify", not debug))
# fun notes; httpx default is 5 seconds, httpclient is 100, urllib in indefinite
timeout = httpx_client_kwargs.pop("timeout", 60.0)
timeout = cast(float, httpx_client_kwargs.pop("timeout", 60.0))

return client_class(proxy=proxy, verify=verify_certs, timeout=timeout, **httpx_client_kwargs)

Expand Down Expand Up @@ -92,7 +92,7 @@ async def make_request_and_raise_if_error_async(
request_body: Optional[dict[str, object]] = None,
files: Optional[dict[str, tuple[str, bytes, str]]] = None,
headers: Optional[dict[str, str]] = None,
**httpx_client_kwargs: Optional[Any],
**httpx_client_kwargs: Any,
) -> httpx.Response:
"""
Make a request and raise an exception if it fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,13 @@ async def _fetch_and_save_image_async(self, image_url: str, behavior_id: str) ->
serializer = data_serializer_factory(category="seed-prompt-entries", data_type="image_path", extension="png")

# Return existing path if image already exists for this BehaviorID
serializer.value = str(serializer._memory.results_path + serializer.data_sub_directory + f"/{filename}")
results_path = serializer._memory.results_path
results_storage_io = serializer._memory.results_storage_io
if not results_path or results_storage_io is None:
raise RuntimeError("[HarmBench-Multimodal] Serializer memory is not properly configured: results_path and results_storage_io must be set.")
serializer.value = str(results_path + serializer.data_sub_directory + f"/{filename}")
try:
if await serializer._memory.results_storage_io.path_exists(serializer.value):
if await results_storage_io.path_exists(serializer.value):
return serializer.value
except Exception as e:
logger.warning(f"[HarmBench-Multimodal] Failed to check if image for {behavior_id} exists in cache: {e}")
Expand Down
18 changes: 12 additions & 6 deletions pyrit/datasets/seed_datasets/remote/vlsu_multimodal_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ async def fetch_dataset(self, *, cache: bool = True) -> SeedDataset:
group_id = uuid.uuid4()

try:
if image_url is None or text is None:
continue
local_image_path = await self._fetch_and_save_image_async(image_url, str(group_id))

# Create text prompt (sequence=0, sent first)
Expand All @@ -179,13 +181,13 @@ async def fetch_dataset(self, *, cache: bool = True) -> SeedDataset:
data_type="text",
name="ML-VLSU Text",
dataset_name=self.dataset_name,
harm_categories=[combined_category],
harm_categories=[combined_category or ""],
description="Text component of ML-VLSU multimodal prompt.",
source=self.source,
prompt_group_id=group_id,
sequence=0,
metadata={
"category": combined_category,
"category": combined_category or "",
"text_grade": text_grade,
"image_grade": image_grade,
"combined_grade": combined_grade,
Expand All @@ -198,13 +200,13 @@ async def fetch_dataset(self, *, cache: bool = True) -> SeedDataset:
data_type="image_path",
name="ML-VLSU Image",
dataset_name=self.dataset_name,
harm_categories=[combined_category],
harm_categories=[combined_category or ""],
description="Image component of ML-VLSU multimodal prompt.",
source=self.source,
prompt_group_id=group_id,
sequence=1,
metadata={
"category": combined_category,
"category": combined_category or "",
"text_grade": text_grade,
"image_grade": image_grade,
"combined_grade": combined_grade,
Expand Down Expand Up @@ -245,9 +247,13 @@ async def _fetch_and_save_image_async(self, image_url: str, group_id: str) -> st
serializer = data_serializer_factory(category="seed-prompt-entries", data_type="image_path", extension="png")

# Return existing path if image already exists
serializer.value = str(serializer._memory.results_path + serializer.data_sub_directory + f"/{filename}")
results_path = serializer._memory.results_path
results_storage_io = serializer._memory.results_storage_io
if not results_path or results_storage_io is None:
raise RuntimeError("[ML-VLSU] Serializer memory is not properly configured.")
serializer.value = str(results_path + serializer.data_sub_directory + f"/{filename}")
try:
if await serializer._memory.results_storage_io.path_exists(serializer.value):
if await results_storage_io.path_exists(serializer.value):
return serializer.value
except Exception as e:
logger.warning(f"[ML-VLSU] Failed to check if image for {group_id} exists in cache: {e}")
Expand Down
2 changes: 1 addition & 1 deletion pyrit/embedding/openai_text_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(
# Create async client - type: ignore needed because get_required_value returns str
# but api_key parameter accepts str | Callable[[], str | Awaitable[str]]
self._async_client = AsyncOpenAI(
api_key=api_key, # type: ignore[arg-type]
api_key=api_key,
base_url=endpoint,
)

Expand Down
3 changes: 2 additions & 1 deletion pyrit/executor/attack/core/attack_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ async def from_seed_group_async(
seed_group.validate()

# SeedAttackGroup validates in __init__ that objective is set
assert seed_group.objective is not None
if seed_group.objective is None:
raise ValueError("seed_group.objective is not initialized")

# Build params dict, only including fields this class accepts
params: dict[str, Any] = {}
Expand Down
Loading