Skip to content
This repository was archived by the owner on Sep 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/playwright.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
python-version: 3.11
- uses: actions/setup-node@v4
with:
node-version: 16
node-version: 18
- uses: actions/cache@v4
with:
path: ~/.cache/pip
Expand Down
13 changes: 13 additions & 0 deletions src/dispatch/ai/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dispatch.enums import DispatchEnum


class AIEventSource(DispatchEnum):
"""Source identifiers for AI-generated events."""

dispatch_genai = "Dispatch GenAI"


class AIEventDescription(DispatchEnum):
"""Description templates for AI-generated events."""

read_in_summary_created = "AI-generated read-in summary created for {participant_email}"
36 changes: 36 additions & 0 deletions src/dispatch/ai/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import List
from pydantic import Field

from dispatch.models import DispatchBase


class ReadInSummary(DispatchBase):
"""
Model for structured read-in summary output from AI analysis.

This model ensures the AI response is properly structured with timeline,
actions taken, and current status sections.
"""

timeline: List[str] = Field(
description="Chronological list of key events and decisions", default_factory=list
)
actions_taken: List[str] = Field(
description="List of actions that were taken to address the security event",
default_factory=list,
)
current_status: str = Field(
description="Current status of the security event and any unresolved issues", default=""
)
summary: str = Field(description="Overall summary of the security event", default="")


class ReadInSummaryResponse(DispatchBase):
"""
Response model for read-in summary generation.

Includes the structured summary and any error messages.
"""

summary: ReadInSummary | None = None
error_message: str | None = None
188 changes: 164 additions & 24 deletions src/dispatch/ai/service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging

from dispatch.plugins.dispatch_slack.models import IncidentSubjects
import tiktoken
from sqlalchemy.orm import aliased, Session

Expand All @@ -9,16 +10,25 @@
from dispatch.enums import Visibility
from dispatch.incident.models import Incident
from dispatch.plugin import service as plugin_service
from dispatch.project.models import Project
from dispatch.signal import service as signal_service
from dispatch.tag.models import Tag, TagRecommendationResponse
from dispatch.tag_type.models import TagType
from dispatch.case import service as case_service
from dispatch.incident import service as incident_service
from dispatch.types import Subject
from dispatch.event import service as event_service
from dispatch.enums import EventType

from .exceptions import GenAIException
from .models import ReadInSummary, ReadInSummaryResponse
from .enums import AIEventSource, AIEventDescription

log = logging.getLogger(__name__)

# Cache duration for AI-generated read-in summaries (in seconds)
READ_IN_SUMMARY_CACHE_DURATION = 120 # 2 minutes


def get_model_token_limit(model_name: str, buffer_percentage: float = 0.05) -> int:
"""
Expand Down Expand Up @@ -101,6 +111,18 @@ def truncate_prompt(
return truncated_prompt


def prepare_prompt_for_model(prompt: str, model_name: str) -> str:
"""
Tokenizes and truncates the prompt if it exceeds the model's token limit.
Returns a prompt string that is safe to send to the model.
"""
tokenized_prompt, num_tokens, encoding = num_tokens_from_string(prompt, model_name)
model_token_limit = get_model_token_limit(model_name)
if num_tokens > model_token_limit:
prompt = truncate_prompt(tokenized_prompt, num_tokens, encoding, model_token_limit)
return prompt


def generate_case_signal_historical_context(case: Case, db_session: Session) -> str:
"""
Generate historical context for a case stemming from a signal, including related cases and relevant data.
Expand Down Expand Up @@ -278,17 +300,10 @@ def generate_case_signal_summary(case: Case, db_session: Session) -> dict[str, s
</historical_context>
"""

tokenized_prompt, num_tokens, encoding = num_tokens_from_string(
prompt = prepare_prompt_for_model(
prompt, genai_plugin.instance.configuration.chat_completion_model
)

# we check if the prompt exceeds the token limit
model_token_limit = get_model_token_limit(
genai_plugin.instance.configuration.chat_completion_model
)
if num_tokens > model_token_limit:
prompt = truncate_prompt(tokenized_prompt, num_tokens, encoding, model_token_limit)

# we generate the analysis
response = genai_plugin.instance.chat_completion(prompt=prompt)

Expand Down Expand Up @@ -372,23 +387,26 @@ def generate_incident_summary(incident: Incident, db_session: Session) -> str:
{pir_doc}
"""

tokenized_prompt, num_tokens, encoding = num_tokens_from_string(
prompt = prepare_prompt_for_model(
prompt, genai_plugin.instance.configuration.chat_completion_model
)

# we check if the prompt exceeds the token limit
model_token_limit = get_model_token_limit(
genai_plugin.instance.configuration.chat_completion_model
)
if num_tokens > model_token_limit:
prompt = truncate_prompt(tokenized_prompt, num_tokens, encoding, model_token_limit)

summary = genai_plugin.instance.chat_completion(prompt=prompt)

incident.summary = summary
db_session.add(incident)
db_session.commit()

# Log the AI summary generation event
event_service.log_incident_event(
db_session=db_session,
source="Dispatch Core App",
description="AI-generated incident summary created",
incident_id=incident.id,
details={"summary": summary},
type=EventType.other,
)

return summary

except Exception as e:
Expand Down Expand Up @@ -529,17 +547,10 @@ def get_tag_recommendations(

prompt += f"** Tags you can use: {tag_list} \n ** Security event details: {resources}"

tokenized_prompt, num_tokens, encoding = num_tokens_from_string(
prompt = prepare_prompt_for_model(
prompt, genai_plugin.instance.configuration.chat_completion_model
)

# we check if the prompt exceeds the token limit
model_token_limit = get_model_token_limit(
genai_plugin.instance.configuration.chat_completion_model
)
if num_tokens > model_token_limit:
prompt = truncate_prompt(tokenized_prompt, num_tokens, encoding, model_token_limit)

try:
result = genai_plugin.instance.chat_completion(prompt=prompt)

Expand All @@ -560,3 +571,132 @@ def get_tag_recommendations(
log.exception(f"Error generating tag recommendations: {e}")
message = "AI tag suggestions encountered an error. Please try again later."
return TagRecommendationResponse(recommendations=[], error_message=message)


def generate_read_in_summary(
*,
db_session,
subject: Subject,
project: Project,
channel_id: str,
important_reaction: str,
participant_email: str = "",
) -> ReadInSummaryResponse:
"""
Generate a read-in summary for a subject.

Args:
subject (Subject): The subject object for which the read-in summary is being generated.
project (Project): The project context.
channel_id (str): The channel ID to get conversation from.
important_reaction (str): The reaction to filter important messages.
participant_email (str): The email of the participant for whom the summary was generated.

Returns:
ReadInSummaryResponse: A structured response containing the read-in summary or error message.
"""
subject_type = subject.type

# Check for recent summary event
if subject_type == IncidentSubjects.incident:
recent_event = event_service.get_recent_summary_event(
db_session, incident_id=subject.id, max_age_seconds=READ_IN_SUMMARY_CACHE_DURATION
)
else:
recent_event = event_service.get_recent_summary_event(
db_session, case_id=subject.id, max_age_seconds=READ_IN_SUMMARY_CACHE_DURATION
)

if recent_event and recent_event.details:
try:
summary = ReadInSummary(**recent_event.details)
return ReadInSummaryResponse(summary=summary)
except Exception as e:
log.warning(
f"Failed to parse cached summary from event {recent_event.id}: {e}. Generating new summary."
)

# Don't generate if no enabled ai plugin or storage plugin
genai_plugin = plugin_service.get_active_instance(
db_session=db_session, plugin_type="artificial-intelligence", project_id=project.id
)
if not genai_plugin:
message = f"Read-in summary not generated for {subject.name}. No artificial-intelligence plugin enabled."
log.warning(message)
return ReadInSummaryResponse(error_message=message)

conversation_plugin = plugin_service.get_active_instance(
db_session=db_session, plugin_type="conversation", project_id=project.id
)
if not conversation_plugin:
message = (
f"Read-in summary not generated for {subject.name}. No conversation plugin enabled."
)
log.warning(message)
return ReadInSummaryResponse(error_message=message)

conversation = conversation_plugin.instance.get_conversation(
conversation_id=channel_id, important_reaction=important_reaction
)
if not conversation:
message = f"Read-in summary not generated for {subject.name}. No conversation found."
log.warning(message)
return ReadInSummaryResponse(error_message=message)

system_message = """You are a cybersecurity analyst tasked with creating structured read-in summaries.
Analyze the provided channel messages and extract key information about a security event.
Focus on identifying:
1. Timeline: Chronological list of key events and decisions (skip channel join/remove messages)
- For all timeline events, format timestamps as YYYY-MM-DD HH:MM (no seconds, no 'T').
2. Actions taken: List of actions that were taken to address the security event
3. Current status: Current status of the security event and any unresolved issues
4. Summary: Overall summary of the security event

Only include the most relevant events and outcomes. Be clear and concise."""

prompt = f"""Analyze the following channel messages regarding a security event and provide a structured summary.

Channel messages: {conversation}
"""

prompt = prepare_prompt_for_model(
prompt, genai_plugin.instance.configuration.chat_completion_model
)

try:
result = genai_plugin.instance.chat_parse(
prompt=prompt, response_model=ReadInSummary, system_message=system_message
)

# Log the AI read-in summary generation event
if subject.type == IncidentSubjects.incident:
# This is an incident
event_service.log_incident_event(
db_session=db_session,
source=AIEventSource.dispatch_genai,
description=AIEventDescription.read_in_summary_created.format(
participant_email=participant_email
),
incident_id=subject.id,
details=result.dict(),
type=EventType.other,
)
else:
# This is a case
event_service.log_case_event(
db_session=db_session,
source=AIEventSource.dispatch_genai,
description=AIEventDescription.read_in_summary_created.format(
participant_email=participant_email
),
case_id=subject.id,
details=result.dict(),
type=EventType.other,
)

return ReadInSummaryResponse(summary=result)

except Exception as e:
log.exception(f"Error generating read-in summary: {e}")
error_msg = f"Error generating read-in summary: {str(e)}"
return ReadInSummaryResponse(error_message=error_msg)
Loading
Loading