Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions tests/consensus/devnet/networking/test_reqresp_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,73 @@ def test_response_success_all_ff(networking_codec: NetworkingCodecTestFiller) ->
codec_name="reqresp_response",
input={"responseCode": 0, "sszData": "0x" + "ff" * 32},
)


# --- End-to-end protocol messages ---
#
# Full stack: SSZ container → wire encode → wire decode → SSZ decode.
# These use the actual Lean consensus message types, not arbitrary bytes.

STATUS_SSZ = (
"0x01010101010101010101010101010101010101010101010101010101010101016400"
"000000000000020202020202020202020202020202020202020202020202020202020202"
"02020296000000000000"
)
"""Status(finalized=Checkpoint(0x01..01, slot=100), head=Checkpoint(0x02..02, slot=150))."""

BLOCKS_BY_ROOT_SSZ = (
"0x04000000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
)
"""BlocksByRootRequest with two roots (0xaa..aa, 0xbb..bb)."""

BLOCKS_BY_ROOT_EMPTY_SSZ = "0x04000000"
"""BlocksByRootRequest with empty root list (4-byte SSZ offset only)."""


def test_request_status(networking_codec: NetworkingCodecTestFiller) -> None:
"""Status request through the full wire pipeline. Protocol: /leanconsensus/req/status/1."""
networking_codec(
codec_name="reqresp_request",
input={"sszData": STATUS_SSZ},
)


def test_response_status(networking_codec: NetworkingCodecTestFiller) -> None:
"""Status SUCCESS response through the full wire pipeline."""
networking_codec(
codec_name="reqresp_response",
input={"responseCode": 0, "sszData": STATUS_SSZ},
)


def test_request_blocks_by_root(networking_codec: NetworkingCodecTestFiller) -> None:
"""BlocksByRoot request with two roots. Protocol: /leanconsensus/req/blocks_by_root/1."""
networking_codec(
codec_name="reqresp_request",
input={"sszData": BLOCKS_BY_ROOT_SSZ},
)


def test_request_blocks_by_root_empty(networking_codec: NetworkingCodecTestFiller) -> None:
"""BlocksByRoot request with empty root list. Minimal 4-byte SSZ payload."""
networking_codec(
codec_name="reqresp_request",
input={"sszData": BLOCKS_BY_ROOT_EMPTY_SSZ},
)


def test_request_snappy_chunk_boundary(networking_codec: NetworkingCodecTestFiller) -> None:
"""Request at Snappy chunk boundary (65536 bytes). Exercises multi-chunk framing."""
networking_codec(
codec_name="reqresp_request",
input={"sszData": "0x" + "ab" * 65536},
)


def test_request_multi_chunk(networking_codec: NetworkingCodecTestFiller) -> None:
"""Request spanning 2+ Snappy chunks (65537 bytes)."""
networking_codec(
codec_name="reqresp_request",
input={"sszData": "0x" + "cd" * 65537},
)
Loading