Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

# sometimes we pull version info before dispatch is totally installed
try:
from dispatch.ai.prompt.models import Prompt # noqa lgtm[py/unused-import]
from dispatch.organization.models import Organization # noqa lgtm[py/unused-import]
from dispatch.project.models import Project # noqa lgtm[py/unused-import]
from dispatch.route.models import Recommendation # noqa lgtm[py/unused-import]
Expand Down
Empty file added src/dispatch/ai/__init__.py
Empty file.
28 changes: 28 additions & 0 deletions src/dispatch/ai/enums.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dispatch.enums import DispatchEnum
from enum import IntEnum


class AIEventSource(DispatchEnum):
Expand All @@ -13,3 +14,30 @@ class AIEventDescription(DispatchEnum):
read_in_summary_created = "AI-generated read-in summary created for {participant_email}"

tactical_report_created = "AI-generated tactical report created for incident {incident_name}"


class GenAIType(IntEnum):
"""GenAI prompt types for different AI operations."""

TAG_RECOMMENDATION = 1
INCIDENT_SUMMARY = 2
SIGNAL_ANALYSIS = 3
CONVERSATION_SUMMARY = 4
TACTICAL_REPORT_SUMMARY = 5

@property
def display_name(self) -> str:
"""Get the human-friendly display name for the type."""
display_names = {
self.TAG_RECOMMENDATION: "Tag Recommendation",
self.INCIDENT_SUMMARY: "Incident Summary",
self.SIGNAL_ANALYSIS: "Signal Analysis",
self.CONVERSATION_SUMMARY: "Conversation Summary",
self.TACTICAL_REPORT_SUMMARY: "Tactical Report Summary",
}
return display_names.get(self, f"Unknown Type ({self.value})")

@classmethod
def get_all_types(cls) -> list[dict]:
"""Get all types with their IDs and display names."""
return [{"id": type_enum.value, "name": type_enum.display_name} for type_enum in cls]
71 changes: 63 additions & 8 deletions src/dispatch/ai/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
from typing import List
from pydantic import Field

from dispatch.models import DispatchBase
from dispatch.tag.models import TagTypeRecommendation


class TagRecommendations(DispatchBase):
"""
Model for structured tag recommendations output from AI analysis.

This model ensures the AI response contains properly structured tag recommendations
grouped by tag type.
"""

recommendations: list[TagTypeRecommendation] = Field(
description="List of tag recommendations grouped by tag type", default_factory=list
)


class ReadInSummary(DispatchBase):
Expand All @@ -12,10 +25,10 @@ class ReadInSummary(DispatchBase):
actions taken, and current status sections.
"""

timeline: List[str] = Field(
timeline: list[str] = Field(
description="Chronological list of key events and decisions", default_factory=list
)
actions_taken: List[str] = Field(
actions_taken: list[str] = Field(
description="List of actions that were taken to address the security event",
default_factory=list,
)
Expand All @@ -41,21 +54,63 @@ class TacticalReport(DispatchBase):
Model for structured tactical report output from AI analysis. Enforces the presence of fields
dedicated to the incident's conditions, actions, and needs.
"""

conditions: str = Field(
description="Summary of incident circumstances, with focus on scope and impact", default=""
)
actions: str | list[str] = Field(
description="Chronological list of actions and analysis by both the party instigating the incident and the response team",
default_factory=list
actions: list[str] = Field(
description=(
"Chronological list of actions and analysis by both the party instigating "
"the incident and the response team"
),
default_factory=list,
)
needs: str | list[str] = Field(
description="Identified and unresolved action items from the incident, or an indication that the incident is at resolution", default=""
needs: list[str] = Field(
description=(
"Identified and unresolved action items from the incident, or an indication "
"that the incident is at resolution"
),
default_factory=list,
)


class TacticalReportResponse(DispatchBase):
"""
Response model for tactical report generation. Includes the structured summary and any error messages.
"""

tactical_report: TacticalReport | None = None
error_message: str | None = None


class CaseSignalSummary(DispatchBase):
"""
Model for structured case signal summary output from AI analysis.

This model represents the specific structure expected from the GenAI signal analysis prompt.
"""

summary: str = Field(
description="4-5 sentence summary of the security event using precise, factual language",
default="",
)
historical_summary: str = Field(
description="2-3 sentence summary of historical cases for this signal", default=""
)
critical_analysis: str = Field(
description="Critical analysis considering false positive scenarios", default=""
)
recommendation: str = Field(
description="Recommended next steps based on the analysis", default=""
)


class CaseSignalSummaryResponse(DispatchBase):
"""
Response model for case signal summary generation.

Includes the structured summary and any error messages.
"""

summary: CaseSignalSummary | None = None
error_message: str | None = None
1 change: 1 addition & 0 deletions src/dispatch/ai/prompt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# This file makes the prompt directory a Python package
65 changes: 65 additions & 0 deletions src/dispatch/ai/prompt/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from datetime import datetime
from sqlalchemy import Column, Integer, String, Boolean, UniqueConstraint

from dispatch.models import DispatchBase
from dispatch.database.core import Base
from dispatch.models import TimeStampMixin, ProjectMixin, Pagination, PrimaryKey
from dispatch.project.models import ProjectRead


class Prompt(Base, TimeStampMixin, ProjectMixin):
"""
SQLAlchemy model for AI prompts.

This model stores AI prompts that can be used for various GenAI operations
like tag recommendations, incident summaries, etc.
"""

# Columns
id = Column(Integer, primary_key=True)
genai_type = Column(Integer, nullable=False)
genai_prompt = Column(String, nullable=False)
genai_system_message = Column(String, nullable=True)
enabled = Column(Boolean, default=False, nullable=False)

# Constraints
__table_args__ = (
UniqueConstraint(
"genai_type",
"project_id",
"enabled",
name="uq_prompt_type_project_enabled",
deferrable=True,
initially="DEFERRED",
),
)


# AI Prompt Models
class PromptBase(DispatchBase):
genai_type: int | None = None
genai_prompt: str | None = None
genai_system_message: str | None = None
enabled: bool | None = None


class PromptCreate(PromptBase):
project: ProjectRead | None = None


class PromptUpdate(DispatchBase):
genai_type: int | None = None
genai_prompt: str | None = None
genai_system_message: str | None = None
enabled: bool | None = None


class PromptRead(PromptBase):
id: PrimaryKey
created_at: datetime | None = None
updated_at: datetime | None = None
project: ProjectRead | None = None


class PromptPagination(Pagination):
items: list[PromptRead]
99 changes: 99 additions & 0 deletions src/dispatch/ai/prompt/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging
from sqlalchemy.orm import Session

from dispatch.project import service as project_service
from dispatch.ai.enums import GenAIType
from .models import Prompt, PromptCreate, PromptUpdate

log = logging.getLogger(__name__)


def get(*, prompt_id: int, db_session: Session) -> Prompt | None:
"""Gets a prompt by its id."""
return db_session.query(Prompt).filter(Prompt.id == prompt_id).one_or_none()


def get_by_type(*, genai_type: int, project_id: int, db_session: Session) -> Prompt | None:
"""Gets an enabled prompt by its type."""
return (
db_session.query(Prompt)
.filter(Prompt.project_id == project_id)
.filter(Prompt.genai_type == genai_type)
.filter(Prompt.enabled)
.first()
)


def get_all(*, db_session: Session) -> list[Prompt | None]:
"""Gets all prompts."""
return db_session.query(Prompt)


def create(*, prompt_in: PromptCreate, db_session: Session) -> Prompt:
"""Creates prompt data."""
project = project_service.get_by_name_or_raise(
db_session=db_session, project_in=prompt_in.project
)

# If this prompt is being enabled, check if another enabled prompt of the same type exists
if prompt_in.enabled:
existing_enabled = (
db_session.query(Prompt)
.filter(Prompt.project_id == project.id)
.filter(Prompt.genai_type == prompt_in.genai_type)
.filter(Prompt.enabled)
.first()
)
if existing_enabled:
type_name = GenAIType(prompt_in.genai_type).display_name
raise ValueError(
f"Another prompt of type '{type_name}' is already enabled for this project. "
"Only one prompt per type can be enabled."
)

prompt = Prompt(**prompt_in.dict(exclude={"project"}), project=project)

db_session.add(prompt)
db_session.commit()
return prompt


def update(
*,
prompt: Prompt,
prompt_in: PromptUpdate,
db_session: Session,
) -> Prompt:
"""Updates a prompt."""
update_data = prompt_in.dict(exclude_unset=True)

# If this prompt is being enabled, check if another enabled prompt of the same type exists
if update_data.get("enabled", False):
existing_enabled = (
db_session.query(Prompt)
.filter(Prompt.project_id == prompt.project_id)
.filter(Prompt.genai_type == prompt.genai_type)
.filter(Prompt.enabled)
.filter(Prompt.id != prompt.id) # Exclude current prompt
.first()
)
if existing_enabled:
type_name = GenAIType(prompt.genai_type).display_name
raise ValueError(
f"Another prompt of type '{type_name}' is already enabled for this project. "
"Only one prompt per type can be enabled."
)

# Update only the fields that were provided in the update data
for field, value in update_data.items():
setattr(prompt, field, value)

db_session.commit()
return prompt


def delete(*, db_session, prompt_id: int):
"""Deletes a prompt."""
prompt = db_session.query(Prompt).filter(Prompt.id == prompt_id).one_or_none()
db_session.delete(prompt)
db_session.commit()
Loading
Loading