Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion application/cmd/cre_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node:
defs.Standard.__name__,
defs.Code.__name__,
defs.Tool.__name__,
defs.Attack.__name__,
]:
# if a node links another node it is likely that a writer wants to reference something
# in that case, find which of the two nodes has at least one CRE attached to it and link both to the parent CRE
Expand Down Expand Up @@ -183,6 +184,7 @@ def parse_file(
defs.Credoctypes.Standard.value,
defs.Credoctypes.Code.value,
defs.Credoctypes.Tool.value,
defs.Credoctypes.Attack.value,
):
# document = defs.Standard(**contents)
doctype = contents.get("doctype")
Expand All @@ -192,7 +194,11 @@ def parse_file(
else (
defs.Code
if doctype == defs.Credoctypes.Code.value
else defs.Tool if doctype == defs.Credoctypes.Tool.value else None
else (
defs.Tool
if doctype == defs.Credoctypes.Tool.value
else defs.Attack
)
)
)
document = from_dict(
Expand Down
29 changes: 28 additions & 1 deletion application/database/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,11 @@ def add_node(
logger.info(
f"knew of node {entry.name}:{entry.section_id}:{entry.section}:{entry.link} ,updating"
)
if node.section and node.section != entry.section:
if (
hasattr(node, "section")
and node.section
and node.section != entry.section
):
entry.section = node.section
entry.link = node.hyperlink
self.session.commit()
Expand Down Expand Up @@ -1924,6 +1928,8 @@ def dbNodeFromNode(doc: cre_defs.Node) -> Optional[Node]:
return dbNodeFromCode(doc)
elif doc.doctype == cre_defs.Credoctypes.Tool:
return dbNodeFromTool(doc)
elif doc.doctype == cre_defs.Credoctypes.Attack:
return dbNodeFromAttack(doc)
else:
return None

Expand Down Expand Up @@ -2015,6 +2021,13 @@ def nodeFromDB(dbnode: Node) -> cre_defs.Node:
tags=tags,
description=dbnode.description,
)
elif dbnode.ntype == cre_defs.Attack.__name__:
return cre_defs.Attack(
name=dbnode.name,
hyperlink=dbnode.link,
tags=tags,
description=dbnode.description,
)
else:
raise ValueError(
f"Db node {dbnode.name} has an unrecognised ntype {dbnode.ntype}"
Expand Down Expand Up @@ -2114,3 +2127,17 @@ def gap_analysis(
)
logger.info(f"stored gapa analysis for {'>>>'.join(node_names)}, successfully")
return (node_names, grouped_paths, extra_paths_dict)


def dbNodeFromAttack(attack: cre_defs.Node) -> Node:
attack = cast(cre_defs.Attack, attack)
tags = ""
if attack.tags:
tags = ",".join(attack.tags)
return Node(
name=attack.name,
ntype=attack.doctype.value,
tags=tags,
description=attack.description,
link=attack.hyperlink,
)
6 changes: 6 additions & 0 deletions application/defs/cre_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class Credoctypes(str, Enum, metaclass=EnumMetaWithContains):
Standard = "Standard"
Tool = "Tool"
Code = "Code"
Attack = "Attack"

@staticmethod
def from_str(typ: str) -> "Credoctypes":
Expand Down Expand Up @@ -526,3 +527,8 @@ def __hash__(self) -> int:
@dataclass(eq=False)
class Code(Node):
doctype: Credoctypes = Credoctypes.Code


@dataclass(eq=False)
class Attack(Node):
doctype: Credoctypes = Credoctypes.Attack
69 changes: 69 additions & 0 deletions application/manual_seed_attacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import sys

# Ensure application matches the import path
sys.path.append(os.getcwd())

from application.database import db
from application.defs import cre_defs as defs
from application.cmd.cre_main import db_connect
from application.config import CMDConfig
from application import create_app


def seed_attacks():
# Database path
db_path = os.path.abspath("standards_cache.sqlite")
print(f"Connecting to DB at {db_path}...")

# Setup context
conf = CMDConfig(db_uri=db_path)
app = create_app(conf=conf)
app_context = app.app_context()
app_context.push()

collection = db.Node_collection()

# Define Attacks
attacks = [
defs.Attack(
name="Path Traversal",
hyperlink="https://owasp.org/www-community/attacks/Path_Traversal",
tags=["OWASP", "Attack"],
),
defs.Attack(
name="SQL Injection",
hyperlink="https://owasp.org/www-community/attacks/SQL_Injection",
tags=["OWASP", "Attack"],
),
]

# Register
print("Seeding attacks...")
for attack in attacks:
# Idempotency Check
existing = collection.get_nodes(name=attack.name)
# Filter for existing attacks with same hyperlink to be precise
if existing and any(n.hyperlink == attack.hyperlink for n in existing):
print(f" Skipping existing: {attack.name}")
continue

db_node = collection.add_node(attack)
print(f" Added: {attack.name} (ID: {db_node.id})")

# Verification
print("\nVerifying...")
nodes = collection.get_nodes(name="SQL Injection", ntype=defs.Attack.__name__)
found = False
for n in nodes:
if n.doctype == defs.Credoctypes.Attack:
print(f"✅ Found Attack: {n.name} ({n.hyperlink}) - Type: {n.doctype}")
found = True

if not found:
print("❌ Verification Failed: SQL Injection Attack node not found!")
sys.exit(1)


if __name__ == "__main__":
seed_attacks()