Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ COPY pyproject.toml poetry.lock* /app/

# Configure Poetry and install dependencies
RUN poetry config virtualenvs.create false && \
poetry install --no-dev
poetry install --no-dev && \
python -m spacy download en_core_web_sm

# Copy the rest of the application
COPY . /app
Expand Down
Binary file modified codegate_volume/models/all-minilm-L6-v2-q5_k_m.gguf
Binary file not shown.
1,078 changes: 1,010 additions & 68 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions prompts/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ secrets_redacted: |
about any tokens, passwords or similar sensitive information in the context whose value begins with
the string "REDACTED".

pii_redacted: |
The context files contain redacted personally identifiable information (PII) that is represented by a UUID encased within <>. For example:
- <123e4567-e89b-12d3-a456-426614174000>
- <2d040296-98e9-4350-84be-fda4336057eb>
If you encounter any PII redacted with a UUID, DO NOT WARN the user about it. Simplt respond to the user request and keep the PII redacted and intact, using the same UUID.

# Security-focused prompts
security_audit: "You are a security expert conducting a thorough code review. Identify potential security vulnerabilities, suggest improvements, and explain security best practices."

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ sqlalchemy = "==2.0.37"
aiosqlite = "==0.20.0"
ollama = "==0.4.7"
pydantic-settings = "==2.7.1"
numpy = "==2.2.2"
numpy = "==1.26.4"
tree-sitter = "==0.24.0"
tree-sitter-go = "==0.23.4"
tree-sitter-java = "==0.23.5"
Expand All @@ -33,6 +33,9 @@ greenlet = "==3.1.1"
cachetools = "==5.5.1"
legacy-cgi = "==2.6.2"

presidio = "^0.1.0"
presidio-analyzer = "^2.2.357"
presidio-anonymizer = "^2.2.357"
[tool.poetry.group.dev.dependencies]
pytest = "==8.3.4"
pytest-cov = "==6.0.0"
Expand Down
57 changes: 56 additions & 1 deletion src/codegate/codegate_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,47 @@ def _missing_(cls, value: str) -> Optional["LogFormat"]:
)


# Define all LiteLLM logger names
LITELLM_LOGGERS = ["LiteLLM Proxy", "LiteLLM Router", "LiteLLM"]


def configure_litellm_logging(enabled: bool = False, level: LogLevel = LogLevel.INFO) -> None:
"""Configure LiteLLM logging.

Args:
enabled: Whether to enable LiteLLM logging
level: Log level to use if enabled
"""
# Configure the main litellm logger
logger = logging.getLogger("litellm")
logger.disabled = not enabled
if not enabled:
logger.setLevel(logging.CRITICAL + 1) # Effectively disables all logging
else:
logger.setLevel(getattr(logging, level.value))
logger.propagate = False
# Clear any existing handlers
logger.handlers.clear()
# Add a handler to ensure logs are properly routed
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, level.value))
logger.addHandler(handler)

# Also configure the specific LiteLLM loggers
for logger_name in LITELLM_LOGGERS:
logger = logging.getLogger(logger_name)
logger.disabled = not enabled
if not enabled:
logger.setLevel(logging.CRITICAL + 1)
else:
logger.setLevel(getattr(logging, level.value))
logger.propagate = False
logger.handlers.clear()
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, level.value))
logger.addHandler(handler)


def add_origin(logger, log_method, event_dict):
# Add 'origin' if it's bound to the logger but not explicitly in the event dict
if "origin" not in event_dict and hasattr(logger, "_context"):
Expand All @@ -58,13 +99,17 @@ def add_origin(logger, log_method, event_dict):


def setup_logging(
log_level: Optional[LogLevel] = None, log_format: Optional[LogFormat] = None
log_level: Optional[LogLevel] = None,
log_format: Optional[LogFormat] = None,
external_loggers: Optional[Dict[str, bool]] = None,
) -> logging.Logger:
"""Configure the logging system.

Args:
log_level: The logging level to use. Defaults to INFO if not specified.
log_format: The log format to use. Defaults to JSON if not specified.
external_loggers: Dictionary of external logger names and whether they should be enabled.
e.g. {"litellm": False, "sqlalchemy": False, "uvicorn.error": False}

This configures two handlers:
- stderr_handler: For ERROR, CRITICAL, and WARNING messages
Expand All @@ -74,6 +119,16 @@ def setup_logging(
log_level = LogLevel.INFO
if log_format is None:
log_format = LogFormat.JSON
if external_loggers is None:
external_loggers = {
"litellm": False,
"sqlalchemy": False,
"uvicorn.error": False,
"aiosqlite": False,
}

# Configure LiteLLM logging based on external_loggers setting
configure_litellm_logging(enabled=external_loggers.get("litellm", False), level=log_level)

# The configuration was taken from structlog documentation
# https://www.structlog.org/en/stable/standard-library.html
Expand Down
13 changes: 11 additions & 2 deletions src/codegate/pipeline/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from codegate.pipeline.extract_snippets.extract_snippets import CodeSnippetExtractor
from codegate.pipeline.extract_snippets.output import CodeCommentStep
from codegate.pipeline.output import OutputPipelineProcessor, OutputPipelineStep
from codegate.pipeline.pii.pii import (
CodegatePii,
PiiRedactionNotifier,
PiiUnRedactionStep,
)
from codegate.pipeline.secrets.manager import SecretsManager
from codegate.pipeline.secrets.secrets import (
CodegateSecrets,
Expand All @@ -22,11 +27,12 @@ def __init__(self, secrets_manager: SecretsManager):

def create_input_pipeline(self) -> SequentialPipelineProcessor:
input_steps: List[PipelineStep] = [
# make sure that this step is always first in the pipeline
# make sure that these steps are always first in the pipeline
# the other steps might send the request to a LLM for it to be analyzed
# and without obfuscating the secrets, we'd leak the secrets during those
# and without obfuscating the secrets/PII, we'd leak them during those
# later steps
CodegateSecrets(),
CodegatePii(),
CodegateCli(),
CodeSnippetExtractor(),
CodegateContextRetriever(),
Expand All @@ -37,13 +43,16 @@ def create_input_pipeline(self) -> SequentialPipelineProcessor:
def create_fim_pipeline(self) -> SequentialPipelineProcessor:
fim_steps: List[PipelineStep] = [
CodegateSecrets(),
CodegatePii(),
]
return SequentialPipelineProcessor(fim_steps, self.secrets_manager, is_fim=True)

def create_output_pipeline(self) -> OutputPipelineProcessor:
output_steps: List[OutputPipelineStep] = [
SecretRedactionNotifier(),
PiiRedactionNotifier(),
SecretUnredactionStep(),
PiiUnRedactionStep(),
CodeCommentStep(),
]
return OutputPipelineProcessor(output_steps)
Expand Down
141 changes: 141 additions & 0 deletions src/codegate/pipeline/pii/analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import uuid
from typing import Any, Dict, List, Tuple

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine


class PiiSessionStore:
"""
A class to manage PII (Personally Identifiable Information) session storage.

Attributes:
session_id (str): The unique identifier for the session. If not provided, a new UUID
is generated. mappings (Dict[str, str]): A dictionary to store mappings between UUID
placeholders and PII.

Methods:
add_mapping(pii: str) -> str:
Adds a PII string to the session store and returns a UUID placeholder for it.

get_pii(uuid_placeholder: str) -> str:
Retrieves the PII string associated with the given UUID placeholder. If the placeholder
is not found, returns the placeholder itself.
"""

def __init__(self, session_id: str = None):
self.session_id = session_id or str(uuid.uuid4())
self.mappings: Dict[str, str] = {}

def add_mapping(self, pii: str) -> str:
uuid_placeholder = f"<{str(uuid.uuid4())}>"
self.mappings[uuid_placeholder] = pii
return uuid_placeholder

def get_pii(self, uuid_placeholder: str) -> str:
return self.mappings.get(uuid_placeholder, uuid_placeholder)


class PiiAnalyzer:
"""
PiiAnalyzer class for analyzing and anonymizing text containing PII.
Methods:
__init__:
Initializes the PiiAnalyzer with a custom NLP engine configuration.
analyze:
text (str): The text to analyze for PII.
Tuple[str, List[Dict[str, Any]], PiiSessionStore]: The anonymized text, a list of
found PII details, and the session store.
entities (List[str]): The PII entities to analyze for.

restore_pii:
anonymized_text (str): The text with anonymized PII.
session_store (PiiSessionStore): The PiiSessionStore used for anonymization.
str: The text with original PII restored.
"""

def __init__(self):
import os

from presidio_analyzer.nlp_engine import NlpEngineProvider

# Get the path to our custom spacy config
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "spacy_config.yaml")

# Initialize the NLP engine with our custom configuration
provider = NlpEngineProvider(conf_file=config_path)
nlp_engine = provider.create_engine()

# Create analyzer with custom NLP engine
self.analyzer = AnalyzerEngine(nlp_engine=nlp_engine)
self.anonymizer = AnonymizerEngine()
self.session_store = PiiSessionStore()

def analyze(self, text: str) -> Tuple[str, List[Dict[str, Any]], PiiSessionStore]:
entities = [
"PHONE_NUMBER",
"EMAIL_ADDRESS",
"CREDIT_CARD",
"CRYPTO",
"IBAN_CODE",
"IP_ADDRESS",
"NRP",
"MEDICAL_LICENSE",
"US_BANK_NUMBER",
"US_DRIVER_LICENSE",
"US_ITIN",
"US_PASSPORT",
"US_SSN",
"UK_NHS",
"UK_NINO",
]

# Analyze the text for PII
analyzer_results = self.analyzer.analyze(text=text, entities=entities, language="en")

# Track found PII
found_pii = []

# Only anonymize if PII was found
if analyzer_results:
# Log each found PII instance and anonymize
anonymized_text = text
for result in analyzer_results:
pii_value = text[result.start : result.end]
uuid_placeholder = self.session_store.add_mapping(pii_value)
pii_info = {
"type": result.entity_type,
"value": pii_value,
"score": result.score,
"start": result.start,
"end": result.end,
"uuid_placeholder": uuid_placeholder,
}
found_pii.append(pii_info)
anonymized_text = anonymized_text.replace(pii_value, uuid_placeholder)

# Return the anonymized text, PII details, and session store
return anonymized_text, found_pii, self.session_store

# If no PII found, return original text, empty list, and session store
return text, [], self.session_store

def restore_pii(self, anonymized_text: str, session_store: PiiSessionStore) -> str:
"""
Restore the original PII (Personally Identifiable Information) in the given anonymized text.

This method replaces placeholders in the anonymized text with their corresponding original
PII values using the mappings stored in the provided PiiSessionStore.

Args:
anonymized_text (str): The text containing placeholders for PII.
session_store (PiiSessionStore): The session store containing mappings of placeholders
to original PII.

Returns:
str: The text with the original PII restored.
"""
for uuid_placeholder, original_pii in session_store.mappings.items():
anonymized_text = anonymized_text.replace(uuid_placeholder, original_pii)
return anonymized_text
67 changes: 67 additions & 0 deletions src/codegate/pipeline/pii/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any, Dict, List, Tuple

import structlog

from codegate.pipeline.pii.analyzer import PiiAnalyzer, PiiSessionStore

logger = structlog.get_logger("codegate")


class PiiManager:
"""
Manages the analysis and restoration of Personally Identifiable Information (PII) in text.

Attributes:
analyzer (PiiAnalyzer): An instance of PiiAnalyzer used for PII detection and restoration.
current_session (PiiSessionStore): Stores the current PII session information.

Methods:
__init__():
Initializes the PiiManager with a PiiAnalyzer instance and sets the
current session to None.

analyze(text: str) -> Tuple[str, List[Dict[str, Any]]]:
Analyzes the given text for PII, anonymizes it, and logs the detected PII details.
Args:
text (str): The text to be analyzed for PII.
Returns:
Tuple[str, List[Dict[str, Any]]]: A tuple containing the anonymized text and
a list of found PII details.

restore_pii(anonymized_text: str) -> str:
Restores the PII in the given anonymized text using the current session.
Args:
anonymized_text (str): The text with anonymized PII to be restored.
Returns:
str: The text with restored PII.
"""

def __init__(self):
self.analyzer = PiiAnalyzer()
self.current_session: PiiSessionStore = None

def analyze(self, text: str) -> Tuple[str, List[Dict[str, Any]]]:
anonymized_text, found_pii, self.current_session = self.analyzer.analyze(text)

# Log found PII details
if found_pii:
for pii in found_pii:
logger.info(
"PII detected",
pii_type=pii["type"],
value="*" * len(pii["value"]), # Don't log actual value
score=f"{pii['score']:.2f}",
)

return anonymized_text, found_pii

def restore_pii(self, anonymized_text: str) -> str:
if self.current_session is None:
logger.warning("No active PII session found. Unable to restore PII.")
return anonymized_text
logger.debug("Restoring PII from session.")
logger.debug(f"Current session: {self.current_session}")
logger.debug(f"Anonymized text: {anonymized_text}")
restored_text = self.analyzer.restore_pii(anonymized_text, self.current_session)
logger.debug(f"Restored text: {restored_text}")
return restored_text
Loading
Loading