Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 91 additions & 9 deletions src/codemodder/codetf/v3/codetf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from pydantic import BaseModel, model_validator

from ..common import Change, CodeTFWriter, Finding, FixQuality
from ..v2.codetf import AIMetadata as AIMetadatav2
from ..v2.codetf import CodeTF as CodeTFv2
from ..v2.codetf import Finding as V2Finding
from ..v2.codetf import Result
from ..v2.codetf import Run as Runv2


class Run(BaseModel):
Expand All @@ -15,17 +19,17 @@ class Run(BaseModel):
vendor: str
tool: str
version: str
# Optional free-form metadata about the project being analyzed
# e.g. project name, directory, commit SHA, etc.
projectMetadata: Optional[str] = None
# Analysis duration in milliseconds
elapsed: Optional[int] = None
# Optional free-form metadata about the inputs used for the analysis
# optional free-form metadata about the project being analyzed
# e.g. project name, directory, commit sha, etc.
projectmetadata: dict | None = None
# analysis duration in milliseconds
elapsed: int | None = None
# optional free-form metadata about the inputs used for the analysis
# e.g. command line, environment variables, etc.
inputMetadata: Optional[dict] = None
# Optional free-form metadata about the analysis itself
inputmetadata: dict | None = None
# optional free-form metadata about the analysis itself
# e.g. timeouts, memory usage, etc.
analysisMetadata: Optional[dict] = None
analysismetadata: dict | None = None


class FixStatusType(str, Enum):
Expand Down Expand Up @@ -116,3 +120,81 @@ def validate_fixMetadata(self):
class CodeTF(CodeTFWriter, BaseModel):
run: Run
results: list[FixResult]


def from_v2_run(run: Runv2) -> Run:
project_metadata = {"directory": run.directory} | (
{"projectName": run.projectName} if run.projectName else {}
)
input_metadata = {"commandLine": run.commandLine} | (
{"sarifs": run.sarifs} if run.sarifs else {}
)

return Run(
vendor=run.vendor,
tool=run.tool,
version=run.version,
elapsed=run.elapsed,
projectmetadata=project_metadata,
inputmetadata=input_metadata,
)


def from_v2_aimetadata(ai_metadata: AIMetadatav2) -> AIMetadata:
return AIMetadata(
provider=ai_metadata.provider,
models=[ai_metadata.model] if ai_metadata.model else None,
total_tokens=ai_metadata.tokens,
completion_tokens=ai_metadata.completion_tokens,
)


def from_v2_result(result: Result) -> list[FixResult]:
fix_results: list[FixResult] = []
# generate fixed
for cs in result.changeset:
# No way of identifying hybrid AI codemods by the metadata alone
generation_metadata = GenerationMetadata(
strategy=Strategy.ai if cs.ai else Strategy.deterministic,
ai=from_v2_aimetadata(cs.ai) if cs.ai else None,
provisional=False,
)
for c in cs.changes:
for f in c.fixedFindings or []:
fix_metadata = FixMetadata(
id=result.codemod,
summary=result.summary,
description=result.description,
generation=generation_metadata,
)
# Retrieve diff from changeset since individual diffs per change may not exist
# If the codetf was generated with per-finding, each ChangeSet will have a single change anyway
changeset = ChangeSet(
path=cs.path, diff=cs.diff, changes=[c.to_common()]
)
fix_results.append(
FixResult(
finding=f,
fixStatus=FixStatus(status=FixStatusType.fixed),
changeSets=[changeset],
fixMetadata=fix_metadata,
)
)

# generate unfixed
for f in result.unfixedFindings or []:
fix_results.append(
FixResult(
finding=f,
fixStatus=FixStatus(status=FixStatusType.failed, reason=f.reason),
)
)

return fix_results


def from_v2(codetf: CodeTFv2) -> CodeTF:
return CodeTF(
run=from_v2_run(codetf.run),
results=[fr for result in codetf.results for fr in from_v2_result(result)],
)
1 change: 1 addition & 0 deletions tests/samples/codetfv2_sample.codetf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"run":{"vendor":"pixee","tool":"codemodder-python","version":"6.2.3.dev2+gba1bb73","commandLine":"codemodder --dry-run repo --path-include=code.py --codemod-include=sonar:python/secure-tempfile --output out.codetf --sonar-json temp_sonar_issues.json --verbose","elapsed":206,"directory":"/home/andrecs/pixee/codemodder-python/repo","sarifs":[]},"results":[{"codemod":"sonar:python/secure-tempfile","summary":"Upgrade and Secure Temp File Creation","description":"This codemod replaces all `tempfile.mktemp` calls with the more secure `tempfile.NamedTemporaryFile`\n\nThe Python [tempfile documentation](https://docs.python.org/3/library/tempfile.html#tempfile.mktemp) is explicit that `tempfile.mktemp` should be deprecated to avoid an unsafe and unexpected race condition. `tempfile.mktemp` does not handle the possibility that the returned file name could already be used by another process by the time your code opens the file. A more secure approach to create temporary files is to use `tempfile.NamedTemporaryFile` which will create the file for you and handle all security conditions. \n\nThe changes from this codemod look like this:\n\n```diff\n import tempfile\n- filename = tempfile.mktemp()\n+ with tempfile.NamedTemporaryFile(delete=False) as tf:\n+ filename = tf.name\n```\n\nThe change sets `delete=False` to closely follow your code's intention when calling `tempfile.mktemp`. However, you should use this as a starting point to determine when your temporary file should be deleted.\n","detectionTool":{"name":"Sonar"},"references":[{"url":"https://docs.python.org/3/library/tempfile.html#tempfile.mktemp","description":"https://docs.python.org/3/library/tempfile.html#tempfile.mktemp"},{"url":"https://cwe.mitre.org/data/definitions/377","description":"https://cwe.mitre.org/data/definitions/377"},{"url":"https://cwe.mitre.org/data/definitions/379","description":"https://cwe.mitre.org/data/definitions/379"},{"url":"https://rules.sonarsource.com/python/RSPEC-5445/","description":"Insecure temporary file creation methods should not be used"}],"properties":{},"failedFiles":[],"changeset":[{"path":"code.py","diff":"--- \n+++ \n@@ -2,5 +2,7 @@\n \n tmp_file = open(tempfile.mktemp(), \"w+\")\n tmp_file.write(\"text\")\n-filename = tempfile.mktemp()\n-filename_2 = tempfile.mktemp()\n+with tempfile.NamedTemporaryFile(delete=False) as tf:\n+ filename = tf.name\n+with tempfile.NamedTemporaryFile(delete=False) as tf:\n+ filename_2 = tf.name\n","changes":[{"lineNumber":5,"description":"Replaces `tempfile.mktemp` with `tempfile.mkstemp`.","diffSide":"right","fixedFindings":[{"id":"2mzYQLBPCYSBxYekUmkYOzcfIBk=","rule":{"id":"python:S5445","name":"Insecure temporary file creation methods should not be used","url":"https://rules.sonarsource.com/python/RSPEC-5445/"}}]},{"lineNumber":6,"description":"Replaces `tempfile.mktemp` with `tempfile.mkstemp`.","diffSide":"right","fixedFindings":[{"id":"rsaOe8uxk1JZ/mBTOPQIuh4tLas=","rule":{"id":"python:S5445","name":"Insecure temporary file creation methods should not be used","url":"https://rules.sonarsource.com/python/RSPEC-5445/"}}]}],"strategy":"deterministic","provisional":false}],"unfixedFindings":[{"id":"DmwOEj9aQKWqDyQ4MpDBx/rxFQ4=","rule":{"id":"python:S5445","name":"python:S5445","url":"https://rules.sonarsource.com/python/RSPEC-5445/"},"path":"code.py","lineNumber":3,"reason":"Pixee does not yet support this fix."}]}]}
98 changes: 98 additions & 0 deletions tests/test_codetf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
Rule,
)
from codemodder.codetf.v3.codetf import Finding as FindingV3
from codemodder.codetf.v3.codetf import FixStatusType, from_v2


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -186,3 +187,100 @@ def test_v2_finding_id_optional():
def test_v3_finding_id_not_optional():
with pytest.raises(ValidationError):
FindingV3(id=None, rule=Rule(id="foo", name="whatever")) # type: ignore[arg-type]


def test_v2_to_v3_conversion():
with open("tests/samples/codetfv2_sample.codetf", "r") as f:
codetfv2 = CodeTF.model_validate_json(f.read())
codetf = from_v2(codetfv2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything below this line can be outside the with context manager


# run
assert codetf.run
assert codetf.run.vendor == codetfv2.run.vendor
assert codetf.run.tool == codetfv2.run.tool
assert codetf.run.version == codetfv2.run.version
assert codetf.run.elapsed == codetfv2.run.elapsed

assert (
codetf.run.projectmetadata
and "directory" in codetf.run.projectmetadata.keys()
and codetf.run.projectmetadata["directory"] == codetfv2.run.directory
)
assert (
codetf.run.projectmetadata
and "projectName" not in codetf.run.projectmetadata.keys()
and not codetfv2.run.projectName
)

assert (
codetf.run.inputmetadata
and "commandLine" in codetf.run.inputmetadata.keys()
and codetf.run.inputmetadata["commandLine"] == codetfv2.run.commandLine
)
assert not codetfv2.run.sarifs
assert codetf.run.inputmetadata and "sarifs" not in codetf.run.inputmetadata.keys()
# results
v2_unfixed = [f for r in codetfv2.results for f in r.unfixedFindings or []]
v2_fixed = [
f
for r in codetfv2.results
for cs in r.changeset
for c in cs.changes
for f in c.fixedFindings or []
]
unfixed = [
fr for fr in codetf.results if fr.fixStatus.status == FixStatusType.failed
]
fixed = [fr for fr in codetf.results if fr.fixStatus.status == FixStatusType.fixed]

# length
assert len(codetf.results) == len(v2_unfixed) + len(v2_fixed) == 3
assert len(unfixed) == len(v2_unfixed) == 1
assert len(fixed) == len(v2_fixed) == 2

assert len(codetfv2.results) == 1
assert len(codetfv2.results[0].changeset) == 1
v2result = codetfv2.results[0]
v2changeset = codetfv2.results[0].changeset[0]
v2_finding_to_change = {
f: c
for r in codetfv2.results
for cs in r.changeset
for c in cs.changes
for f in c.fixedFindings or []
}

for f in fixed:
# fix metadata
assert (
f.fixMetadata
and f.fixMetadata.generation
and f.fixMetadata.generation.ai == v2changeset.ai
)
assert (
f.fixMetadata and f.fixMetadata.id and f.fixMetadata.id == v2result.codemod
)
assert (
f.fixMetadata
and f.fixMetadata.summary
and f.fixMetadata.summary == v2result.summary
)
assert (
f.fixMetadata
and f.fixMetadata.description
and f.fixMetadata.description == v2result.description
)

# correctly associates findings to the change
assert f.changeSets and f.changeSets[0].path == v2changeset.path
assert f.changeSets and f.changeSets[0].diff == v2changeset.diff
assert isinstance(f.finding, Finding) and f.changeSets[0].changes == [
v2_finding_to_change[f.finding].to_common()
]

# unfixed metadata
assert (
unfixed[0].fixStatus.reason
and unfixed[0].fixStatus.reason == v2_unfixed[0].reason
)
assert unfixed[0].finding == v2_unfixed[0]
Loading