Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion copi.owasp.org/coveralls.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{
"skip_files": [
"lib/copi/migrations/",
"test/support/"
"test/support/",
"lib/copi/release.ex",
"lib/copi_web.ex",
"lib/copi_web/controllers/error_html.ex"

],
"coverage_options": {
Expand Down
7 changes: 6 additions & 1 deletion copi.owasp.org/lib/copi_web/live/game_live/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ defmodule CopiWeb.GameLive.Show do
def handle_info(%{topic: message_topic, event: "game:updated", payload: updated_game}, socket) do
cond do
topic(updated_game.id) == message_topic ->
{:noreply, assign(socket, :game, updated_game) |> assign(:requested_round, updated_game.rounds_played + 1)}
current_round = if updated_game.finished_at do
updated_game.rounds_played
else
updated_game.rounds_played + 1
end
{:noreply, assign(socket, :game, updated_game) |> assign(:requested_round, current_round)}
true ->
{:noreply, socket}
end
Expand Down
5 changes: 5 additions & 0 deletions copi.owasp.org/test/copi/ip_helper_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ defmodule Copi.IPHelperTest do
info = %{req_headers: [{:"x-forwarded-for", "10.2.3.4"}]}
assert IPHelper.get_ip_from_connect_info(info) == {10, 2, 3, 4}
end

test "handles x_headers as raw binary string" do
info = %{x_headers: "10.8.9.1"}
assert IPHelper.get_ip_from_connect_info(info) == {10, 8, 9, 1}
end
end

describe "get_ip_from_socket/1 (LiveView) - additional coverage" do
Expand Down
25 changes: 25 additions & 0 deletions copi.owasp.org/test/copi_web/live/game_live/show_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,30 @@ defmodule CopiWeb.GameLive.ShowTest do
:timer.sleep(50)
assert render(show_live) =~ game.name
end

test "handle_info sets requested_round to rounds_played for finished game", %{conn: conn, game: game} do
{:ok, _} =
Cornucopia.update_game(game, %{
started_at: DateTime.truncate(DateTime.utc_now(), :second),
finished_at: DateTime.truncate(DateTime.utc_now(), :second),
rounds_played: 3
})

# Use Game.find to get fully preloaded struct (same as real broadcasts)
{:ok, finished_game} = Cornucopia.Game.find(game.id)

{:ok, show_live, _html} = live(conn, "/games/#{finished_game.id}")

send(show_live.pid, %{
topic: "game:#{finished_game.id}",
event: "game:updated",
payload: finished_game
})

:timer.sleep(50)
# With the fix, requested_round = rounds_played = 3, template shows "Viewing round"
# With the bug, requested_round = rounds_played + 1 = 4, template shows "Round 4:"
assert render(show_live) =~ "Viewing round"
end
end
end
13 changes: 12 additions & 1 deletion copi.owasp.org/test/copi_web/live/player_live/show_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ defmodule CopiWeb.PlayerLive.ShowTest do
assert updated_game.rounds_played == 1
end

test "helper functions return expected values", %{conn: _conn, player: _player} do
test "helper functions return expected values", %{conn: _conn, player: player} do
alias CopiWeb.PlayerLive.Show

assert Show.ordered_cards([]) == []
Expand All @@ -132,9 +132,11 @@ defmodule CopiWeb.PlayerLive.ShowTest do
assert Show.card_played_in_round([], 1) == nil
assert Show.round_closed?(%{players: [], rounds_played: 0}) == true

# last_round? returns false when a player still has a nil-round card
player_with_unplayed = %{dealt_cards: [%{played_in_round: nil}]}
refute Show.last_round?(%{players: [player_with_unplayed], rounds_played: 0})

# last_round? returns true when all cards are played
player_all_played = %{dealt_cards: [%{played_in_round: 1}]}
assert Show.last_round?(%{players: [player_all_played], rounds_played: 0})

Expand Down Expand Up @@ -177,6 +179,7 @@ defmodule CopiWeb.PlayerLive.ShowTest do
Ecto.Changeset.change(game, started_at: DateTime.truncate(DateTime.utc_now(), :second))
)

# Card played in round 1 (current round) → round_open? = false
{:ok, card1} =
Cornucopia.create_card(%{
category: "C", value: "V3", description: "D", edition: "webapp",
Expand All @@ -185,6 +188,7 @@ defmodule CopiWeb.PlayerLive.ShowTest do
capec: [], safecode: [], owasp_mastg: [], owasp_masvs: []
})

# Unplayed card → last_round? = false (player still has nil-round card)
{:ok, card2} =
Cornucopia.create_card(%{
category: "C", value: "V4", description: "D", edition: "webapp",
Expand All @@ -207,6 +211,7 @@ defmodule CopiWeb.PlayerLive.ShowTest do

{:ok, updated_game} = Cornucopia.Game.find(game_id)
assert updated_game.rounds_played == 1
# last_round? = false because player still has unplayed card
assert updated_game.finished_at == nil
end

Expand All @@ -219,13 +224,15 @@ defmodule CopiWeb.PlayerLive.ShowTest do
Ecto.Changeset.change(game, started_at: DateTime.truncate(DateTime.utc_now(), :second))
)

# Player has exactly one card, played in round 1 → no nil-round cards remain
{:ok, card} =
Cornucopia.create_card(%{
category: "C", value: "V5", description: "D", edition: "webapp",
version: "2.2", external_id: "NR_LAST1", language: "en", misc: "m",
owasp_scp: [], owasp_devguide: [], owasp_asvs: [], owasp_appsensor: [],
capec: [], safecode: [], owasp_mastg: [], owasp_masvs: []
})

Copi.Repo.insert!(%Copi.Cornucopia.DealtCard{
player_id: player.id, card_id: card.id, played_in_round: 1
})
Expand All @@ -249,12 +256,14 @@ defmodule CopiWeb.PlayerLive.ShowTest do

{:ok, show_live, _html} = live(conn, "/games/#{game_id}/players/#{player.id}")

# No vote yet → should insert a continue vote
render_click(show_live, "toggle_continue_vote", %{})
:timer.sleep(100)

{:ok, updated_game} = Cornucopia.Game.find(game_id)
assert length(updated_game.continue_votes) == 1

# Vote exists → should delete it
render_click(show_live, "toggle_continue_vote", %{})
:timer.sleep(100)

Expand Down Expand Up @@ -284,12 +293,14 @@ defmodule CopiWeb.PlayerLive.ShowTest do

{:ok, show_live, _html} = live(conn, "/games/#{game_id}/players/#{player.id}")

# No vote yet → should insert a vote
render_click(show_live, "toggle_vote", %{"dealt_card_id" => to_string(dealt.id)})
:timer.sleep(100)

{:ok, updated_dealt} = Copi.Cornucopia.DealtCard.find(to_string(dealt.id))
assert length(updated_dealt.votes) == 1

# Vote exists → should delete it
render_click(show_live, "toggle_vote", %{"dealt_card_id" => to_string(dealt.id)})
:timer.sleep(100)

Expand Down
131 changes: 131 additions & 0 deletions tests/scripts/convert_capec_utest.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,5 +841,136 @@ def test_create_capec_pages_with_asvs_mapping(self, mock_create_folder, mock_fil
self.assertIn("V8.1.1", written_content)


class TestMainFlow(unittest.TestCase):
def _args(self):
return argparse.Namespace(
output_path=Path("/tmp/out"),
input_path=Path("/tmp/in.json"),
asvs_mapping=Path("/tmp/asvs.json"),
capec_to_asvs=Path("/tmp/map.yaml"),
asvs_version="5.0",
debug=False,
)

@patch("scripts.convert_capec.create_capec_pages")
@patch("scripts.convert_capec.load_capec_to_asvs_mapping", return_value={1: {"owasp_asvs": ["V1.1.1"]}})
@patch("scripts.convert_capec.load_json_file", side_effect=[{}, {"Requirements": []}])
@patch("scripts.convert_capec.create_folder")
@patch("scripts.convert_capec.empty_folder")
@patch("scripts.convert_capec.set_logging")
@patch("scripts.convert_capec.get_valid_version", return_value="5.0")
@patch("scripts.convert_capec.parse_arguments")
def test_main_returns_on_invalid_capec_data(
self,
mock_parse,
mock_get_version,
mock_set_logging,
mock_empty,
mock_create_folder,
mock_load_json,
mock_load_map,
mock_create_pages,
):
mock_parse.return_value = self._args()

with patch("scripts.convert_capec.sys.argv", ["convert_capec.py"]), self.assertLogs(
logging.getLogger(), logging.ERROR
) as log:
capec.main()

self.assertIn("Invalid CAPEC data structure", "\n".join(log.output))
mock_create_pages.assert_not_called()

@patch("scripts.convert_capec.create_capec_pages")
@patch("scripts.convert_capec.load_capec_to_asvs_mapping", return_value={1: {"owasp_asvs": ["V1.1.1"]}})
@patch("scripts.convert_capec.validate_json_data", return_value=True)
@patch("scripts.convert_capec.load_json_file", side_effect=[{"Attack_Pattern_Catalog": {}}, {}])
@patch("scripts.convert_capec.create_folder")
@patch("scripts.convert_capec.empty_folder")
@patch("scripts.convert_capec.set_logging")
@patch("scripts.convert_capec.get_valid_version", return_value="5.0")
@patch("scripts.convert_capec.parse_arguments")
def test_main_returns_when_asvs_mapping_missing(
self,
mock_parse,
mock_get_version,
mock_set_logging,
mock_empty,
mock_create_folder,
mock_load_json,
mock_validate,
mock_load_map,
mock_create_pages,
):
mock_parse.return_value = self._args()

with patch("scripts.convert_capec.sys.argv", ["convert_capec.py"]), self.assertLogs(
logging.getLogger(), logging.ERROR
) as log:
capec.main()

self.assertIn("Failed to load ASVS mapping data", "\n".join(log.output))
mock_create_pages.assert_not_called()

@patch("scripts.convert_capec.create_capec_pages")
@patch("scripts.convert_capec.load_capec_to_asvs_mapping", return_value={})
@patch("scripts.convert_capec.validate_json_data", return_value=True)
@patch("scripts.convert_capec.load_json_file", side_effect=[{"Attack_Pattern_Catalog": {}}, {"Requirements": []}])
@patch("scripts.convert_capec.create_folder")
@patch("scripts.convert_capec.empty_folder")
@patch("scripts.convert_capec.set_logging")
@patch("scripts.convert_capec.get_valid_version", return_value="5.0")
@patch("scripts.convert_capec.parse_arguments")
def test_main_returns_when_capec_map_missing(
self,
mock_parse,
mock_get_version,
mock_set_logging,
mock_empty,
mock_create_folder,
mock_load_json,
mock_validate,
mock_load_map,
mock_create_pages,
):
mock_parse.return_value = self._args()

with patch("scripts.convert_capec.sys.argv", ["convert_capec.py"]), self.assertLogs(
logging.getLogger(), logging.ERROR
) as log:
capec.main()

self.assertIn("Failed to load CAPEC to ASVS mapping", "\n".join(log.output))
mock_create_pages.assert_not_called()

@patch("scripts.convert_capec.create_capec_pages")
@patch("scripts.convert_capec.load_capec_to_asvs_mapping", return_value={1: {"owasp_asvs": ["V1.1.1"]}})
@patch("scripts.convert_capec.validate_json_data", return_value=True)
@patch("scripts.convert_capec.load_json_file", side_effect=[{"Attack_Pattern_Catalog": {}}, {"Requirements": []}])
@patch("scripts.convert_capec.create_folder")
@patch("scripts.convert_capec.empty_folder")
@patch("scripts.convert_capec.set_logging")
@patch("scripts.convert_capec.get_valid_version", return_value="5.0")
@patch("scripts.convert_capec.parse_arguments")
def test_main_success_calls_create_pages(
self,
mock_parse,
mock_get_version,
mock_set_logging,
mock_empty,
mock_create_folder,
mock_load_json,
mock_validate,
mock_load_map,
mock_create_pages,
):
mock_parse.return_value = self._args()

with patch("scripts.convert_capec.sys.argv", ["convert_capec.py"]):
capec.main()

mock_create_pages.assert_called_once()


if __name__ == "__main__":
unittest.main()
90 changes: 89 additions & 1 deletion tests/scripts/convert_utest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2278,7 +2278,6 @@ def test_linux_uses_shutil_which(self, mock_which, mock_platform):


class TestConvertWithLibreOffice(unittest.TestCase):

@patch("scripts.convert._get_libreoffice_bin", return_value=None)
def test_no_binary(self, mock_bin):
result = c._convert_with_libreoffice("file.docx", "out.pdf")
Expand Down Expand Up @@ -2360,5 +2359,94 @@ def test_real_validate_command_args_execution(self, mock_run, mock_paths, mock_b
self.assertIn(result, [True, False])


class TestPdfSecurityHelpers(unittest.TestCase):
def setUp(self) -> None:
self.original_base = c.convert_vars.BASE_PATH
self.original_args = getattr(c.convert_vars, "args", None)
c.convert_vars.args = argparse.Namespace(debug=False)

def tearDown(self) -> None:
c.convert_vars.BASE_PATH = self.original_base
if self.original_args is not None:
c.convert_vars.args = self.original_args

def test_validate_file_paths_rejects_missing_source(self) -> None:
with mock.patch("scripts.convert.os.path.isfile", return_value=False):
ok, msg, out = c._validate_file_paths("missing.odt", "out.pdf")
self.assertFalse(ok)
self.assertIn("Source file does not exist", msg)
self.assertEqual(out, "")

def test_validate_file_paths_rejects_missing_output_dir(self) -> None:
with mock.patch("scripts.convert.os.path.isfile", return_value=True), mock.patch(
"scripts.convert.os.path.isdir", return_value=False
):
ok, msg, out = c._validate_file_paths("in.odt", "bad/out.pdf")
self.assertFalse(ok)
self.assertIn("Output directory does not exist", msg)
self.assertEqual(out, "")

def test_validate_file_paths_rejects_outside_base(self) -> None:
c.convert_vars.BASE_PATH = os.path.join("C:", "repo", "base")
with mock.patch("scripts.convert.os.path.isfile", return_value=True), mock.patch(
"scripts.convert.os.path.isdir", return_value=True
), mock.patch("scripts.convert.os.path.abspath", side_effect=lambda p: p):
ok, msg, out = c._validate_file_paths("C:/outside/source.odt", "C:/outside/out.pdf")
self.assertFalse(ok)
self.assertIn("outside base directory", msg)
self.assertEqual(out, "")

def test_validate_command_args_blocks_dangerous(self) -> None:
with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
valid = c._validate_command_args(["libreoffice", "evil&arg"])
self.assertFalse(valid)
self.assertIn("Potentially dangerous character", log.output[0])

def test_convert_with_libreoffice_handles_invalid_validated_paths(self) -> None:
with mock.patch("scripts.convert._get_libreoffice_bin", return_value="/usr/bin/libreoffice"), mock.patch(
"scripts.convert._validate_file_paths", return_value=(False, "bad path", "")
):
with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
ok = c._convert_with_libreoffice("in.odt", "out.pdf")
self.assertFalse(ok)
self.assertIn("bad path", log.output[0])

def test_convert_with_libreoffice_handles_timeout(self) -> None:
c.convert_vars.BASE_PATH = os.path.join("C:", "repo", "base")
with mock.patch("scripts.convert._get_libreoffice_bin", return_value="/usr/bin/libreoffice"), mock.patch(
"scripts.convert._validate_file_paths", return_value=(True, "C:/repo/base/in.odt", "C:/repo/base/out")
), mock.patch("scripts.convert._validate_command_args", return_value=True), mock.patch(
"scripts.convert.os.makedirs"
), mock.patch(
"scripts.convert.subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="lo", timeout=1)
):
with self.assertLogs(logging.getLogger(), logging.WARNING) as log:
ok = c._convert_with_libreoffice("in.odt", "out.pdf")
self.assertFalse(ok)
self.assertIn("timed out", log.output[0])

def test_cleanup_temp_file_swallows_oserror(self) -> None:
c.convert_vars.args = argparse.Namespace(debug=False)
with mock.patch("scripts.convert.os.remove", side_effect=OSError("busy")):
c._cleanup_temp_file("tmp.docx")

def test_rename_libreoffice_output_replaces_existing_destination(self) -> None:
with tempfile.TemporaryDirectory() as td:
source_filename = os.path.join(td, "guide.odt")
default_out = os.path.join(td, "guide.pdf")
desired_out = os.path.join(td, "final.pdf")
with open(source_filename, "w", encoding="utf-8") as f:
f.write("src")
with open(default_out, "w", encoding="utf-8") as f:
f.write("default")
with open(desired_out, "w", encoding="utf-8") as f:
f.write("old")

c._rename_libreoffice_output(source_filename, desired_out)

self.assertTrue(os.path.isfile(desired_out))
self.assertFalse(os.path.isfile(default_out))


if __name__ == "__main__":
unittest.main()
Loading