Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 51 additions & 17 deletions python/semantic_kernel/core_plugins/http_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,73 @@ class HttpPlugin(KernelBaseModel):
"""A plugin that provides HTTP functionality.

Usage:
kernel.add_plugin(HttpPlugin(), "http")

# With allowed domains for security:
# With allowed domains (recommended):
kernel.add_plugin(HttpPlugin(allowed_domains=["example.com", "api.example.com"]), "http")

# Explicitly allow all domains (opt-in, less secure):
kernel.add_plugin(HttpPlugin(allow_all_domains=True), "http")

Examples:
{{http.getAsync $url}}
{{http.postAsync $url}}
{{http.putAsync $url}}
{{http.deleteAsync $url}}

Security:
- By default, all requests are blocked unless ``allowed_domains`` is provided
or ``allow_all_domains`` is set to True.
- When ``allowed_domains`` is set, HTTP redirects are disabled to prevent
redirect-based domain bypass (SSRF).
Comment on lines +33 to +34
- Only ``http`` and ``https`` URL schemes are permitted.
"""

allowed_domains: set[str] | None = None
"""List of allowed domains to send requests to. If None, all domains are allowed."""
"""Set of allowed domains to send requests to."""

allow_all_domains: bool = False
"""When True, requests to any domain are allowed. Must be explicitly set."""

_ALLOWED_SCHEMES: frozenset[str] = frozenset({"http", "https"})

def _is_uri_allowed(self, url: str) -> bool:
"""Check if the URL's host is in the allowed domains list.
"""Check if the URL's host and scheme are permitted.

Args:
url: The URL to check.

Returns:
True if the URL is allowed, False otherwise.
"""
if self.allowed_domains is None:
return True

parsed = urlparse(url)

# Validate scheme
if parsed.scheme.lower() not in self._ALLOWED_SCHEMES:
return False

host = parsed.hostname
if host is None:
if not host:
return False

# Case-insensitive comparison
return host.lower() in {domain.lower() for domain in self.allowed_domains}
# If allow_all_domains is set, skip domain check
if self.allow_all_domains:
return True

# If allowed_domains is set, check against it
if self.allowed_domains is not None:
return host.lower() in {domain.lower() for domain in self.allowed_domains}

# Default: deny all
return False

def _validate_url(self, url: str) -> None:
"""Validate the URL, checking if it's not empty and is in the allowed domains.
"""Validate the URL, checking scheme, emptiness, and allowed domains.

Args:
url: The URL to validate.

Raises:
FunctionExecutionException: If the URL is empty or not in the allowed domains.
FunctionExecutionException: If the URL is empty, uses a disallowed scheme,
or targets a domain that is not allowed.
"""
if not url:
raise FunctionExecutionException("url cannot be `None` or empty")
Expand All @@ -77,7 +101,11 @@ async def get(self, url: Annotated[str, "The URL to send the request to."]) -> s
"""
self._validate_url(url)

async with aiohttp.ClientSession() as session, session.get(url, raise_for_status=True) as response:
allow_redirects = self.allow_all_domains or self.allowed_domains is None
async with (
aiohttp.ClientSession() as session,
session.get(url, raise_for_status=True, allow_redirects=allow_redirects) as response,
Comment on lines 102 to +107
):
return await response.text()

@kernel_function(description="Makes a POST request to a uri", name="postAsync")
Expand All @@ -98,9 +126,10 @@ async def post(

headers = {"Content-Type": "application/json"}
data = json.dumps(body) if body is not None else None
allow_redirects = self.allow_all_domains or self.allowed_domains is None
async with (
aiohttp.ClientSession() as session,
session.post(url, headers=headers, data=data, raise_for_status=True) as response,
session.post(url, headers=headers, data=data, raise_for_status=True, allow_redirects=allow_redirects) as response,
):
return await response.text()

Expand All @@ -123,9 +152,10 @@ async def put(

headers = {"Content-Type": "application/json"}
data = json.dumps(body) if body is not None else None
allow_redirects = self.allow_all_domains or self.allowed_domains is None
async with (
aiohttp.ClientSession() as session,
session.put(url, headers=headers, data=data, raise_for_status=True) as response,
session.put(url, headers=headers, data=data, raise_for_status=True, allow_redirects=allow_redirects) as response,
):
return await response.text()

Expand All @@ -141,5 +171,9 @@ async def delete(self, url: Annotated[str, "The URI to send the request to."]) -
"""
self._validate_url(url)

async with aiohttp.ClientSession() as session, session.delete(url, raise_for_status=True) as response:
allow_redirects = self.allow_all_domains or self.allowed_domains is None
async with (
aiohttp.ClientSession() as session,
session.delete(url, raise_for_status=True, allow_redirects=allow_redirects) as response,
):
return await response.text()
160 changes: 148 additions & 12 deletions python/tests/unit/core_plugins/test_http_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


async def test_it_can_be_instantiated():
plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
assert plugin is not None


Expand All @@ -23,7 +23,7 @@ async def test_it_can_be_instantiated_with_allowed_domains():

async def test_it_can_be_imported():
kernel = Kernel()
plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
kernel.add_plugin(plugin, "http")
assert kernel.get_plugin(plugin_name="http") is not None
assert kernel.get_plugin(plugin_name="http").name == "http"
Expand All @@ -36,20 +36,20 @@ async def test_get(mock_get):
mock_get.return_value.__aenter__.return_value.text.return_value = "Hello"
mock_get.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
response = await plugin.get("https://example.org/get")
assert response == "Hello"


@pytest.mark.parametrize("method", ["get", "post", "put", "delete"])
async def test_fail_no_url(method):
plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
with pytest.raises(FunctionExecutionException):
await getattr(plugin, method)(url="")


async def test_get_none_url():
plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
with pytest.raises(FunctionExecutionException):
await plugin.get(None)

Expand All @@ -59,7 +59,7 @@ async def test_post(mock_post):
mock_post.return_value.__aenter__.return_value.text.return_value = "Hello World !"
mock_post.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
arguments = KernelArguments(url="https://example.org/post", body="{message: 'Hello, world!'}")
response = await plugin.post(**arguments)
assert response == "Hello World !"
Expand All @@ -70,7 +70,7 @@ async def test_post_nobody(mock_post):
mock_post.return_value.__aenter__.return_value.text.return_value = "Hello World !"
mock_post.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
arguments = KernelArguments(url="https://example.org/post")
response = await plugin.post(**arguments)
assert response == "Hello World !"
Expand All @@ -81,7 +81,7 @@ async def test_put(mock_put):
mock_put.return_value.__aenter__.return_value.text.return_value = "Hello World !"
mock_put.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
arguments = KernelArguments(url="https://example.org/put", body="{message: 'Hello, world!'}")
response = await plugin.put(**arguments)
assert response == "Hello World !"
Expand All @@ -92,7 +92,7 @@ async def test_put_nobody(mock_put):
mock_put.return_value.__aenter__.return_value.text.return_value = "Hello World !"
mock_put.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
arguments = KernelArguments(url="https://example.org/put")
response = await plugin.put(**arguments)
assert response == "Hello World !"
Expand All @@ -103,7 +103,7 @@ async def test_delete(mock_delete):
mock_delete.return_value.__aenter__.return_value.text.return_value = "Hello World !"
mock_delete.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin()
plugin = HttpPlugin(allow_all_domains=True)
arguments = KernelArguments(url="https://example.org/delete")
response = await plugin.delete(**arguments)
assert response == "Hello World !"
Expand Down Expand Up @@ -179,8 +179,8 @@ async def test_allowed_domains_case_insensitive():


async def test_allowed_domains_none_allows_all():
"""Test that when allowed_domains is None, all domains are allowed."""
plugin = HttpPlugin() # allowed_domains defaults to None
"""Test that when allow_all_domains is True, all domains are allowed."""
plugin = HttpPlugin(allow_all_domains=True)
assert plugin._is_uri_allowed("https://any-domain.com/path") is True
assert plugin._is_uri_allowed("https://another-domain.org/path") is True

Expand Down Expand Up @@ -214,3 +214,139 @@ async def test_allowed_domains_exact_subdomain_match():
assert plugin._is_uri_allowed("https://sub.example.com/path") is True
assert plugin._is_uri_allowed("https://example.com/path") is False
assert plugin._is_uri_allowed("https://other.example.com/path") is False


# Security regression tests


async def test_default_constructor_denies_all():
"""Test that default HttpPlugin() denies all requests (issue 115285)."""
plugin = HttpPlugin()
assert plugin._is_uri_allowed("https://example.com/path") is False
assert plugin._is_uri_allowed("https://any-domain.com/path") is False


@pytest.mark.parametrize("method", ["get", "post", "put", "delete"])
async def test_default_constructor_blocks_requests(method):
"""Test that default HttpPlugin() blocks all HTTP methods (issue 115285)."""
plugin = HttpPlugin()
with pytest.raises(FunctionExecutionException, match="Sending requests to the provided location is not allowed"):
if method in ["post", "put"]:
await getattr(plugin, method)(url="https://example.com/path", body={"key": "value"})
else:
await getattr(plugin, method)(url="https://example.com/path")


@patch("aiohttp.ClientSession.get")
async def test_allow_all_domains_flag(mock_get):
"""Test that allow_all_domains=True permits requests to any domain."""
mock_get.return_value.__aenter__.return_value.text.return_value = "OK"
mock_get.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin(allow_all_domains=True)
response = await plugin.get("https://any-domain.com/path")
assert response == "OK"


@patch("aiohttp.ClientSession.get")
async def test_redirects_disabled_with_allowed_domains(mock_get):
"""Test that redirects are disabled when allowed_domains is set (issue 115048)."""
mock_get.return_value.__aenter__.return_value.text.return_value = "OK"
mock_get.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin(allowed_domains={"example.com"})
await plugin.get("https://example.com/path")

_, kwargs = mock_get.call_args
assert kwargs["allow_redirects"] is False


@patch("aiohttp.ClientSession.post")
async def test_redirects_disabled_for_post_with_allowed_domains(mock_post):
"""Test that redirects are disabled for POST when allowed_domains is set."""
mock_post.return_value.__aenter__.return_value.text.return_value = "OK"
mock_post.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin(allowed_domains={"example.com"})
await plugin.post("https://example.com/path", body={"key": "value"})

_, kwargs = mock_post.call_args
assert kwargs["allow_redirects"] is False


@patch("aiohttp.ClientSession.put")
async def test_redirects_disabled_for_put_with_allowed_domains(mock_put):
"""Test that redirects are disabled for PUT when allowed_domains is set."""
mock_put.return_value.__aenter__.return_value.text.return_value = "OK"
mock_put.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin(allowed_domains={"example.com"})
await plugin.put("https://example.com/path", body={"key": "value"})

_, kwargs = mock_put.call_args
assert kwargs["allow_redirects"] is False


@patch("aiohttp.ClientSession.delete")
async def test_redirects_disabled_for_delete_with_allowed_domains(mock_delete):
"""Test that redirects are disabled for DELETE when allowed_domains is set."""
mock_delete.return_value.__aenter__.return_value.text.return_value = "OK"
mock_delete.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin(allowed_domains={"example.com"})
await plugin.delete("https://example.com/path")

_, kwargs = mock_delete.call_args
assert kwargs["allow_redirects"] is False


@patch("aiohttp.ClientSession.get")
async def test_redirects_allowed_with_allow_all_domains(mock_get):
"""Test that redirects are still allowed when allow_all_domains is True."""
mock_get.return_value.__aenter__.return_value.text.return_value = "OK"
mock_get.return_value.__aenter__.return_value.status = 200

plugin = HttpPlugin(allow_all_domains=True)
await plugin.get("https://example.com/path")

_, kwargs = mock_get.call_args
assert kwargs["allow_redirects"] is True
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only verifies allow_redirects=True for GET. The corresponding 'redirects disabled' tests cover all four methods (GET/POST/PUT/DELETE) individually, but the 'redirects allowed' path only covers GET. Each HTTP method in http_plugin.py computes allow_redirects on its own line, so a copy-paste error in post(), put(), or delete() would go undetected. Consider adding parametrized or individual tests asserting allow_redirects is True for the other three methods when allow_all_domains=True.



@pytest.mark.parametrize("scheme", ["file", "ftp", "gopher", "data"])
async def test_disallowed_schemes_blocked(scheme):
"""Test that non-HTTP schemes are blocked."""
plugin = HttpPlugin(allow_all_domains=True)
assert plugin._is_uri_allowed(f"{scheme}://example.com/path") is False


@pytest.mark.parametrize("scheme", ["file", "ftp", "gopher"])
@pytest.mark.parametrize("method", ["get", "post", "put", "delete"])
async def test_disallowed_schemes_blocked_all_methods(scheme, method):
"""Test that non-HTTP schemes are blocked for all HTTP methods."""
plugin = HttpPlugin(allow_all_domains=True)
with pytest.raises(FunctionExecutionException, match="Sending requests to the provided location is not allowed"):
if method in ["post", "put"]:
await getattr(plugin, method)(url=f"{scheme}://example.com/path", body={"key": "value"})
else:
await getattr(plugin, method)(url=f"{scheme}://example.com/path")


async def test_http_scheme_allowed():
"""Test that both http and https schemes are allowed."""
plugin = HttpPlugin(allow_all_domains=True)
assert plugin._is_uri_allowed("http://example.com/path") is True
assert plugin._is_uri_allowed("https://example.com/path") is True


async def test_empty_hostname_rejected():
"""Test that URLs with empty hostnames are rejected."""
plugin = HttpPlugin(allow_all_domains=True)
assert plugin._is_uri_allowed("http://") is False
assert plugin._is_uri_allowed("https://") is False


async def test_allow_all_domains_with_allowed_domains_allows_redirects():
"""Test that redirects are allowed when both allow_all_domains and allowed_domains are set."""
plugin = HttpPlugin(allowed_domains={"example.com"}, allow_all_domains=True)
assert plugin._is_uri_allowed("https://any-domain.com/path") is True
Loading