Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 138 additions & 4 deletions test/test_uri_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from pymongo.errors import ConfigurationError, InvalidURI
from pymongo.synchronous.uri_parser import parse_uri
from pymongo.uri_parser_shared import (
_unquoted_percent,
parse_host,
parse_ipv6_literal_host,
parse_userinfo,
split_hosts,
split_options,
Expand Down Expand Up @@ -462,7 +465,6 @@ def test_tlsinsecure_simple(self):
"tlsInsecure": True,
"tlsDisableOCSPEndpointCheck": True,
}
print(parse_uri(uri)["options"])
self.assertEqual(res, parse_uri(uri)["options"])

def test_normalize_options(self):
Expand Down Expand Up @@ -555,9 +557,141 @@ def test_port_with_whitespace(self):
with self.assertRaisesRegex(ValueError, r"Port contains whitespace character: '\\n'"):
parse_uri("mongodb://localhost:27\n017")

def test_parse_uri_options_type(self):
opts = parse_uri("mongodb://localhost:27017")["options"]
self.assertIsInstance(opts, dict)
def test_unquoted_percent(self):
self.assertFalse(_unquoted_percent(""))
self.assertFalse(_unquoted_percent("no_percent_here"))
self.assertFalse(_unquoted_percent("%25")) # %25 decodes to literal "%"
self.assertFalse(_unquoted_percent("%40")) # %40 decodes to "@"
self.assertFalse(_unquoted_percent("user%40domain.com"))
self.assertFalse(_unquoted_percent("%2B")) # %2B decodes to "+"
self.assertFalse(_unquoted_percent("%E2%85%A8")) # multi-byte sequence
self.assertFalse(_unquoted_percent("%2525")) # double-encoded: %25 -> %
self.assertTrue(_unquoted_percent("%foo")) # 'o' is not a hex digit
self.assertTrue(_unquoted_percent("50%off")) # 'o' is not a hex digit
self.assertTrue(_unquoted_percent("100%")) # trailing bare %

def test_parse_ipv6_literal_host_direct(self):
self.assertEqual(("::1", 27017), parse_ipv6_literal_host("[::1]", 27017))
self.assertEqual(("::1", None), parse_ipv6_literal_host("[::1]", None))
# IPv6 with explicit port returns port as a string (int conversion
# happens later in parse_host).
host, port = parse_ipv6_literal_host("[::1]:27018", 27017)
self.assertEqual("::1", host)
self.assertEqual("27018", port)
full_ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
host, port = parse_ipv6_literal_host(f"[{full_ipv6}]", 27017)
self.assertEqual(full_ipv6, host)
self.assertEqual(27017, port)
self.assertRaises(ValueError, parse_ipv6_literal_host, "[::1", 27017)

def test_parse_host_case_normalization(self):
self.assertEqual(("localhost", 27017), parse_host("LOCALHOST:27017"))
self.assertEqual(("example.com", 27017), parse_host("Example.COM"))
self.assertEqual(("example.com", 27017), parse_host("EXAMPLE.COM:27017"))
self.assertEqual(("192.168.1.1", 27017), parse_host("192.168.1.1:27017"))
self.assertEqual(("::1", 27017), parse_host("[::1]:27017"))

def test_parse_host_port_boundaries(self):
self.assertEqual(("localhost", 1), parse_host("localhost:1"))
self.assertEqual(("localhost", 65535), parse_host("localhost:65535"))
self.assertRaises(ValueError, parse_host, "localhost:0")
self.assertRaises(ValueError, parse_host, "localhost:65536")

def test_tls_option_conflicts(self):
self.assertRaises(
InvalidURI, split_options, "tlsInsecure=true&tlsAllowInvalidCertificates=true"
)
# The conflict is based on presence, not value.
self.assertRaises(
InvalidURI, split_options, "tlsInsecure=true&tlsAllowInvalidHostnames=false"
)
self.assertRaises(
InvalidURI, split_options, "tlsInsecure=true&tlsDisableOCSPEndpointCheck=true"
)
self.assertRaises(
InvalidURI,
split_options,
"tlsAllowInvalidCertificates=true&tlsDisableOCSPEndpointCheck=true",
)
self.assertRaises(InvalidURI, split_options, "ssl=true&tls=false")
self.assertRaises(InvalidURI, split_options, "ssl=false&tls=true")
self.assertTrue(split_options("ssl=true&tls=true"))
self.assertTrue(split_options("ssl=false&tls=false"))

def test_split_options_mixed_delimiters(self):
self.assertRaises(InvalidURI, split_options, "ssl=true&tls=true;appname=foo")
self.assertRaises(InvalidURI, split_options, "appname=foo;ssl=true&tls=true")

def test_split_options_duplicate_warning(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
split_options("appname=foo&appname=bar")
self.assertEqual(1, len(w))
self.assertIn("Duplicate URI option", str(w[0].message))

def test_split_options_empty_authsource(self):
self.assertRaises(InvalidURI, split_options, "authSource=")

def test_check_options_conflicts(self):
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb://host1,host2/?directConnection=true",
)
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb://host1,host2/?loadBalanced=true",
)
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb://localhost/?directConnection=true&loadBalanced=true",
)
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb://localhost/?loadBalanced=true&replicaSet=rs0",
)

def test_validate_uri_edge_cases(self):
self.assertRaises(InvalidURI, parse_uri, "mongodb://")
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb://localhost/?srvServiceName=myService",
)
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb://localhost/?srvMaxHosts=1",
)
self.assertRaises(InvalidURI, parse_uri, "mongodb://localhost/%24db")
self.assertRaises(InvalidURI, parse_uri, "mongodb://localhost/my%20db")

def test_validate_uri_srv_structure(self):
with patch("pymongo.uri_parser_shared._have_dnspython", return_value=True):
self.assertRaises(
InvalidURI,
parse_uri,
"mongodb+srv://host1.example.com,host2.example.com",
)
self.assertRaises(
InvalidURI,
parse_uri,
"mongodb+srv://host1.example.com:27017",
)
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb+srv://host1.example.com/?directConnection=true",
)
with patch("pymongo.uri_parser_shared._have_dnspython", return_value=False):
self.assertRaises(
ConfigurationError,
parse_uri,
"mongodb+srv://host1.example.com",
)


if __name__ == "__main__":
Expand Down
Loading