Skip to content

Commit 55fff43

Browse files
committed
Add support for ClamV entries collection
Signed-off-by: ziad hany <ziadhany2016@gmail.com>
1 parent be89117 commit 55fff43

8 files changed

Lines changed: 399 additions & 0 deletions

File tree

vulnerabilities/improvers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from vulnerabilities.pipelines import flag_ghost_packages
2020
from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline
2121
from vulnerabilities.pipelines import remove_duplicate_advisories
22+
from vulnerabilities.pipelines.v2_improvers import clamv_rules
2223
from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2
2324
from vulnerabilities.pipelines.v2_improvers import compute_package_risk as compute_package_risk_v2
2425
from vulnerabilities.pipelines.v2_improvers import (
@@ -70,5 +71,6 @@
7071
compute_advisory_todo_v2.ComputeToDo,
7172
unfurl_version_range_v2.UnfurlVersionRangePipeline,
7273
compute_advisory_todo.ComputeToDo,
74+
clamv_rules.ClamVRulesImproverPipeline,
7375
]
7476
)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Generated by Django 4.2.25 on 2025-12-01 20:13
2+
3+
from django.db import migrations, models
4+
import django.db.models.deletion
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
("vulnerabilities", "0103_codecommit_impactedpackage_affecting_commits_and_more"),
11+
]
12+
13+
operations = [
14+
migrations.CreateModel(
15+
name="AdvisoryDetectionRule",
16+
fields=[
17+
(
18+
"id",
19+
models.AutoField(
20+
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
21+
),
22+
),
23+
(
24+
"rule_text",
25+
models.TextField(
26+
help_text="Full text of the detection rule, script, or signature."
27+
),
28+
),
29+
(
30+
"rule_type",
31+
models.CharField(
32+
blank=True,
33+
choices=[
34+
("yara", "YARA"),
35+
("sigma", "Sigma Detection Rule"),
36+
("clamav", "ClamAV Signature"),
37+
],
38+
max_length=100,
39+
),
40+
),
41+
(
42+
"source_url",
43+
models.URLField(
44+
blank=True,
45+
help_text="URL or reference to the source of the rule (vendor feed, GitHub repo, etc.).",
46+
null=True,
47+
),
48+
),
49+
(
50+
"advisory",
51+
models.ForeignKey(
52+
on_delete=django.db.models.deletion.CASCADE,
53+
related_name="detection_rules",
54+
to="vulnerabilities.advisoryv2",
55+
),
56+
),
57+
],
58+
),
59+
]
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Generated by Django 4.2.25 on 2025-12-01 21:52
2+
3+
from django.db import migrations, models
4+
import django.db.models.deletion
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
("vulnerabilities", "0104_advisorydetectionrule"),
11+
]
12+
13+
operations = [
14+
migrations.AlterField(
15+
model_name="advisorydetectionrule",
16+
name="advisory",
17+
field=models.ForeignKey(
18+
blank=True,
19+
null=True,
20+
on_delete=django.db.models.deletion.SET_NULL,
21+
related_name="detection_rules",
22+
to="vulnerabilities.advisoryv2",
23+
),
24+
),
25+
]

vulnerabilities/models.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3414,3 +3414,31 @@ class CodeCommit(models.Model):
34143414

34153415
class Meta:
34163416
unique_together = ("commit_hash", "vcs_url")
3417+
3418+
3419+
class AdvisoryDetectionRule(models.Model):
3420+
""""""
3421+
3422+
RULE_TYPES = [
3423+
("yara", "YARA"),
3424+
("sigma", "Sigma Detection Rule"),
3425+
("clamav", "ClamAV Signature"),
3426+
]
3427+
3428+
advisory = models.ForeignKey(
3429+
AdvisoryV2,
3430+
related_name="detection_rules",
3431+
on_delete=models.SET_NULL,
3432+
null=True,
3433+
blank=True,
3434+
)
3435+
3436+
rule_text = models.TextField(help_text="Full text of the detection rule, script, or signature.")
3437+
3438+
rule_type = models.CharField(max_length=100, choices=RULE_TYPES, blank=True)
3439+
3440+
source_url = models.URLField(
3441+
null=True,
3442+
blank=True,
3443+
help_text="URL or reference to the source of the rule (vendor feed, GitHub repo, etc.).",
3444+
)
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import gzip
11+
import io
12+
import os
13+
import shutil
14+
import tarfile
15+
import tempfile
16+
from pathlib import Path
17+
from typing import List
18+
19+
import requests
20+
21+
from vulnerabilities.models import AdvisoryAlias
22+
from vulnerabilities.models import AdvisoryDetectionRule
23+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
24+
from vulnerabilities.utils import find_all_cve
25+
26+
27+
def extract_cvd(cvd_path, output_dir):
28+
"""
29+
Extract a CVD file. CVD format: 512-byte header + gzipped tar archive and returns Path to output directory
30+
"""
31+
output_path = Path(output_dir)
32+
output_path.mkdir(parents=True, exist_ok=True)
33+
34+
with open(cvd_path, "rb") as f:
35+
f.seek(512) # Skip header
36+
compressed_data = f.read()
37+
38+
decompressed_data = gzip.decompress(compressed_data)
39+
tar_buffer = io.BytesIO(decompressed_data)
40+
41+
with tarfile.open(fileobj=tar_buffer, mode="r:") as tar:
42+
tar.extractall(path=output_path)
43+
44+
for file in output_path.rglob("*"):
45+
if file.is_file():
46+
file.chmod(0o644) # rw-r--r--
47+
return output_path
48+
49+
50+
def parse_ndb_file(ndb_path: Path) -> List[dict]:
51+
"""Parse a .ndb file (extended signatures). Return list of dicts."""
52+
signatures = []
53+
with ndb_path.open("r", encoding="utf-8", errors="ignore") as f:
54+
for line_num, line in enumerate(f, 1):
55+
line = line.strip()
56+
if not line or line.startswith("#"):
57+
continue
58+
59+
parts = line.split(":")
60+
if len(parts) >= 4:
61+
signatures.append(
62+
{
63+
"name": parts[0],
64+
"target_type": parts[1],
65+
"offset": parts[2],
66+
"hex_signature": parts[3],
67+
"line_num": line_num,
68+
}
69+
)
70+
return signatures
71+
72+
73+
def parse_hdb_file(hdb_path: Path) -> List[dict]:
74+
"""Parse a .hdb file (MD5 hash signatures). Return list of dicts."""
75+
signatures = []
76+
with hdb_path.open("r", encoding="utf-8", errors="ignore") as f:
77+
for line_num, line in enumerate(f, 1):
78+
line = line.strip()
79+
if not line or line.startswith("#"):
80+
continue
81+
82+
parts = line.split(":")
83+
if len(parts) >= 3:
84+
signatures.append(
85+
{
86+
"hash": parts[0],
87+
"file_size": parts[1],
88+
"name": parts[2],
89+
"line_num": line_num,
90+
}
91+
)
92+
return signatures
93+
94+
95+
def extract_cve_id(name: str):
96+
"""Normalize underscores and extract the first CVE ID from a string, or None."""
97+
normalized = name.replace("_", "-")
98+
cves = [cve.upper() for cve in find_all_cve(normalized)]
99+
return cves[0] if cves else None
100+
101+
102+
class ClamVRulesImproverPipeline(VulnerableCodeBaseImporterPipelineV2):
103+
"""
104+
Pipeline that downloads ClamAV database (main.cvd), extracts signatures,
105+
parses .ndb and .hdb files and save a detection rules.
106+
"""
107+
108+
pipeline_id = "clamv_rules"
109+
MAIN_DATABASE_URL = "https://database.clamav.net/main.cvd"
110+
license_url = ""
111+
license_expression = "GNU GENERAL PUBLIC LICENSE"
112+
113+
@classmethod
114+
def steps(cls):
115+
return (
116+
cls.download_database,
117+
cls.extract_database,
118+
cls.collect_and_store_advisories,
119+
cls.clean_downloads,
120+
)
121+
122+
def download_database(self):
123+
"""Download ClamAV database using the supported API with proper headers."""
124+
125+
self.log("Downloading ClamAV database…")
126+
self.db_dir = Path(tempfile.mkdtemp()) / "clamav_db"
127+
self.db_dir.mkdir(parents=True, exist_ok=True)
128+
129+
database_url = "https://database.clamav.net/main.cvd?api-version=1"
130+
headers = {
131+
"User-Agent": "ClamAV-Client/1.0 (https://github.com/yourproject)",
132+
"Accept": "*/*",
133+
}
134+
135+
filename = self.db_dir / "main.cvd"
136+
self.log(f"Downloading {database_url}{filename}")
137+
138+
resp = requests.get(database_url, headers=headers, stream=True, timeout=30)
139+
resp.raise_for_status()
140+
141+
with filename.open("wb") as f:
142+
for chunk in resp.iter_content(chunk_size=8192):
143+
if chunk:
144+
f.write(chunk)
145+
146+
self.log("ClamAV DB file downloaded successfully.")
147+
148+
def extract_database(self):
149+
"""Extract the downloaded CVD into a directory"""
150+
out_dir = self.db_dir / "extracted"
151+
self.extract_cvd_dir = extract_cvd(self.db_dir / "main.cvd", out_dir)
152+
self.log(f"Extracted CVD to {self.extract_cvd_dir}")
153+
154+
def collect_and_store_advisories(self):
155+
"""Parse .ndb and .hdb files and store rules in the DB."""
156+
rules = {}
157+
for entry in parse_hdb_file(self.extract_cvd_dir / "main.hdb") + parse_ndb_file(
158+
self.extract_cvd_dir / "main.ndb"
159+
):
160+
name = entry.get("name", "")
161+
cve = extract_cve_id(name)
162+
if cve:
163+
rules[cve] = entry
164+
165+
rules_added = 0
166+
for cve_id, rule_text in rules.items():
167+
advisories = set()
168+
try:
169+
if alias := AdvisoryAlias.objects.get(alias=cve_id):
170+
for adv in alias.advisories.all():
171+
advisories.add(adv)
172+
except AdvisoryAlias.DoesNotExist:
173+
self.log(f"Advisory {cve_id} not found.")
174+
continue
175+
176+
for advisory in advisories:
177+
AdvisoryDetectionRule.objects.update_or_create(
178+
advisory=advisory,
179+
rule_type="clamav",
180+
defaults={
181+
"rule_text": str(rule_text),
182+
},
183+
)
184+
185+
rules_added += 1
186+
self.log(f"Successfully added/updated {rules_added} rules for advisories.")
187+
188+
def clean_downloads(self):
189+
"""Clean up downloaded files."""
190+
if getattr(self, "db_dir", None) and os.path.exists(self.db_dir):
191+
shutil.rmtree(self.db_dir, ignore_errors=True)
192+
self.log("Cleaned up downloaded files.")
193+
194+
def on_failure(self):
195+
"""Ensure cleanup on failure."""
196+
self.clean_downloads()

0 commit comments

Comments
 (0)