Skip to content
141 changes: 110 additions & 31 deletions src/host/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <netinet/in.h>
#include <optional>
#include <unistd.h>

namespace asynchost
{
Expand Down Expand Up @@ -192,6 +193,13 @@ namespace asynchost
}
else
{
if (!set_connection_timeout_on_uv_handle())
{
assert_status(BINDING, BINDING_FAILED);
behaviour->on_bind_failed();
return;
}

assert_status(BINDING, CONNECTING_RESOLVING);
if (addr_current != nullptr)
{
Expand Down Expand Up @@ -411,37 +419,8 @@ namespace asynchost
return false;
}

if (is_client)
{
uv_os_sock_t sock = 0;
if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
{
LOG_FAIL_FMT(
"socket creation failed: {}",
std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
return false;
}

if (connection_timeout.has_value())
{
const unsigned int ms = connection_timeout->count();
const auto ret =
setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms, sizeof(ms));
if (ret != 0)
{
LOG_FAIL_FMT(
"Failed to set socket option (TCP_USER_TIMEOUT): {}",
std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
return false;
}
}

if ((rc = uv_tcp_open(&uv_handle, sock)) < 0)
{
LOG_FAIL_FMT("uv_tcp_open failed: {}", uv_strerror(rc));
return false;
}
}
// Client socket creation is deferred to connect_resolved(), where
// the resolved address family (AF_INET or AF_INET6) is known.

if ((rc = uv_tcp_keepalive(&uv_handle, 1, 30)) < 0)
{
Expand Down Expand Up @@ -544,6 +523,50 @@ namespace asynchost

bool connect_resolved()
{
// Create the client socket with the correct address family, but only
// if client_bind() hasn't already created one via uv_tcp_bind().
if (is_client && !client_host.has_value() && addr_current != nullptr)
{
Comment thread
achamayou marked this conversation as resolved.
int rc = 0;
Comment thread
achamayou marked this conversation as resolved.
uv_os_fd_t existing_fd = {};
const auto uv_fileno_rc = uv_fileno(
reinterpret_cast<const uv_handle_t*>(&uv_handle), &existing_fd);
if (uv_fileno_rc < 0 && uv_fileno_rc != UV_EBADF)
{
LOG_FAIL_FMT(
"uv_fileno returned unexpected error while checking TCP handle "
"state: {}",
uv_strerror(uv_fileno_rc));
return false;
}

if (uv_fileno_rc == UV_EBADF)
{
const int family = addr_current->ai_family;
uv_os_sock_t sock = 0;
if ((sock = socket(family, SOCK_STREAM, IPPROTO_TCP)) == -1)
{
LOG_FAIL_FMT(
"socket creation failed: {}",
std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
return false;
}

if (!set_connection_timeout(sock))
{
close_socket_before_uv_ownership(sock);
return false;
}

if ((rc = uv_tcp_open(&uv_handle, sock)) < 0)
{
LOG_FAIL_FMT("uv_tcp_open failed: {}", uv_strerror(rc));
close_socket_before_uv_ownership(sock);
return false;
}
}
}

auto* req = new uv_connect_t; // NOLINT(cppcoreguidelines-owning-memory)
int rc = 0;

Expand Down Expand Up @@ -573,6 +596,62 @@ namespace asynchost
return false;
}

bool set_connection_timeout(uv_os_sock_t sock)
{
if (!connection_timeout.has_value())
{
return true;
}

const unsigned int ms = connection_timeout->count();
const auto ret =
setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms, sizeof(ms));
if (ret != 0)
{
const auto err = errno;
LOG_FAIL_FMT(
"Failed to set socket option (TCP_USER_TIMEOUT): {}",
std::strerror(err)); // NOLINT(concurrency-mt-unsafe)
return false;
}

return true;
}

static void close_socket_before_uv_ownership(uv_os_sock_t sock)
{
// Socket ownership is transferred to libuv only if uv_tcp_open succeeds.
// Before that, this socket must be closed by the caller.
// This is best-effort cleanup on an existing failure path: we only log
// close() errors (including EINTR). We intentionally do not retry
// close(), since retrying may close a reused fd.
const auto rc = ::close(sock);
if (rc != 0)
{
const auto err = errno;
LOG_FAIL_FMT(
"Failed to close socket {}: {}",
sock,
std::strerror(err)); // NOLINT(concurrency-mt-unsafe)
}
}

bool set_connection_timeout_on_uv_handle()
{
uv_os_fd_t existing_fd = {};
const auto rc = uv_fileno(
reinterpret_cast<const uv_handle_t*>(&uv_handle), &existing_fd);
if (rc < 0)
{
LOG_FAIL_FMT(
"uv_fileno failed while applying TCP_USER_TIMEOUT: {}",
uv_strerror(rc));
return false;
}

return set_connection_timeout(existing_fd);
}

void assert_status(Status from, Status to)
{
if (status != from)
Expand Down
8 changes: 6 additions & 2 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -2427,8 +2427,7 @@ namespace ccf
// numeric, but at least the final component (TLD) must not be
// all-numeric. So this distinguishes "1.2.3.4" (an IP address) from
// "1.2.3.c4m" (a DNS name). "1.2.3." is invalid for either, and will
// throw. Attempts to handle IPv6 by also splitting on ':', but this is
// untested.
// throw. Handles IPv6 by splitting on ':' after splitting on '.'.
const auto final_component =
ccf::nonstd::split(ccf::nonstd::split(hostname, ".").back(), ":")
.back();
Expand Down Expand Up @@ -2459,6 +2458,11 @@ namespace ccf
for (const auto& [_, interface] : config.network.rpc_interfaces)
{
auto host = split_net_address(interface.published_address).first;
// Strip brackets from IPv6 addresses (e.g. "[::1]" -> "::1")
if (host.size() >= 2 && host.front() == '[' && host.back() == ']')
{
host = host.substr(1, host.size() - 2);
}
sans.push_back({host, is_ip(host)});
}
return sans;
Expand Down
36 changes: 36 additions & 0 deletions tests/e2e_common_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.network
import infra.interfaces
from ccf.ledger import NodeStatus
import http
import random
import socket
import suite.test_requirements as reqs


Expand Down Expand Up @@ -320,3 +322,37 @@ def run(args):
test_node_ids(network, args)
test_large_messages(network, args)
test_readiness(network, args)


def run_ipv6(args):
# Check if IPv6 loopback is available before attempting to start nodes.
# Some CI environments disable IPv6, in which case this test is skipped.
try:
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
s.bind(("::1", 0))
except (OSError, socket.error):
LOG.warning("IPv6 loopback (::1) is not available, skipping IPv6 test")
return

# Set each RPC interface host to the IPv6 loopback address directly,
# so the setting is isolated to this test (no environment variable).
# Ports are dynamically assigned, so sharing ::1 across nodes is fine.
for host_spec in args.nodes:
for rpc_interface in host_spec.rpc_interfaces.values():
rpc_interface.host = "::1"

with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)

primary, _ = network.find_primary()
primary_interface = primary.host.rpc_interfaces[
infra.interfaces.PRIMARY_RPC_INTERFACE
]
assert (
":" in primary_interface.host
), f"Expected IPv6 address, got {primary_interface.host}"
LOG.info(f"Confirmed primary is using IPv6 address: {primary_interface.host}")

test_primary(network, args)
7 changes: 7 additions & 0 deletions tests/e2e_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2635,6 +2635,13 @@ def run_parsing_errors(args):
nodes=infra.e2e_args.max_nodes(cr.args, f=0),
)

cr.add(
"common_ipv6",
e2e_common_endpoints.run_ipv6,
package="samples/apps/logging/logging",
nodes=infra.e2e_args.max_nodes(cr.args, f=0),
)

# Run illegal traffic tests in separate runners, to reduce total serial runtime
cr.add(
"js_illegal",
Expand Down
15 changes: 11 additions & 4 deletions tests/infra/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,17 @@ def __init__(
if interface_name == infra.interfaces.PRIMARY_RPC_INTERFACE:
if rpc_interface.protocol == "local":
if not self.major_version or self.major_version > 1:
self.node_client_host = str(
ipaddress.ip_address(BASE_NODE_CLIENT_HOST)
+ self.local_node_id
)
if ":" in rpc_interface.host:
# Pure IPv6 addresses (e.g. ::1) are not
# compatible with the IPv4-based client
# interface used for partition simulation.
# Skip client interface binding for IPv6.
self.node_client_host = None
else:
self.node_client_host = str(
ipaddress.ip_address(BASE_NODE_CLIENT_HOST)
+ self.local_node_id
)
else:
assert False, f"{rpc_interface.protocol} is not 'local://'"

Expand Down