Skip to content

Commit 818cc73

Browse files
committed
update mapping selection flow
1 parent 578d453 commit 818cc73

File tree

16 files changed

+88
-311
lines changed

16 files changed

+88
-311
lines changed

uncoder-core/app/translator/core/mapping.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from abc import ABC, abstractmethod
4-
from typing import TYPE_CHECKING, Optional, TypeVar
4+
from typing import TYPE_CHECKING, Optional, TypeVar, Union
55

66
from app.translator.core.exceptions.core import StrictPlatformException
77
from app.translator.core.models.platform_details import PlatformDetails
@@ -19,9 +19,14 @@ class LogSourceSignature(ABC):
1919
wildcard_symbol = "*"
2020

2121
@abstractmethod
22-
def is_suitable(self, *args, **kwargs) -> bool:
22+
def is_suitable(self, **kwargs) -> bool:
2323
raise NotImplementedError("Abstract method")
2424

25+
@staticmethod
26+
def _check_conditions(conditions: list[Union[bool, None]]) -> bool:
27+
conditions = [condition for condition in conditions if condition is not None]
28+
return bool(conditions) and all(conditions)
29+
2530
@abstractmethod
2631
def __str__(self) -> str:
2732
raise NotImplementedError("Abstract method")
@@ -147,9 +152,23 @@ def prepare_fields_mapping(field_mapping: dict) -> FieldsMapping:
147152
def prepare_log_source_signature(self, mapping: dict) -> LogSourceSignature:
148153
raise NotImplementedError("Abstract method")
149154

150-
@abstractmethod
151-
def get_suitable_source_mappings(self, *args, **kwargs) -> list[SourceMapping]:
152-
raise NotImplementedError("Abstract method")
155+
def get_suitable_source_mappings(
156+
self, field_names: list[str], log_sources: dict[str, list[Union[int, str]]]
157+
) -> list[SourceMapping]:
158+
by_log_sources_and_fields = []
159+
by_fields = []
160+
for source_mapping in self._source_mappings.values():
161+
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
162+
continue
163+
164+
if source_mapping.fields_mapping.is_suitable(field_names):
165+
by_fields.append(source_mapping)
166+
167+
log_source_signature: LogSourceSignature = source_mapping.log_source_signature
168+
if log_source_signature.is_suitable(**log_sources):
169+
by_log_sources_and_fields.append(source_mapping)
170+
171+
return by_log_sources_and_fields or by_fields or [self._source_mappings[DEFAULT_MAPPING_NAME]]
153172

154173
def get_source_mapping(self, source_id: str) -> Optional[SourceMapping]:
155174
return self._source_mappings.get(source_id)

uncoder-core/app/translator/core/parser.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,30 +62,24 @@ def get_query_tokens(self, query: str) -> list[QUERY_TOKEN_TYPE]:
6262
raise TokenizerGeneralException("Can't translate empty query. Please provide more details")
6363
return self.tokenizer.tokenize(query=query)
6464

65+
@staticmethod
6566
def get_field_tokens(
66-
self, query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None
67+
query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None
6768
) -> list[Field]:
6869
field_tokens = []
6970
for token in query_tokens:
70-
if isinstance(token, FieldValue):
71-
field_tokens.append(token.field)
72-
elif isinstance(token, FieldField):
73-
if token.field_left:
74-
field_tokens.append(token.field_left)
75-
if token.field_right:
76-
field_tokens.append(token.field_right)
77-
elif isinstance(token, FunctionValue):
78-
field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args([token.function]))
71+
if isinstance(token, (FieldField, FieldValue, FunctionValue)):
72+
field_tokens.extend(token.fields)
7973

8074
if functions:
81-
field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args(functions))
75+
field_tokens.extend([field for func in functions for field in func.fields])
8276

8377
return field_tokens
8478

8579
def get_source_mappings(
86-
self, field_tokens: list[Field], log_sources: dict[str, Union[str, list[str]]]
80+
self, field_tokens: list[Field], log_sources: dict[str, list[Union[int, str]]]
8781
) -> list[SourceMapping]:
8882
field_names = [field.source_name for field in field_tokens]
89-
source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources)
83+
source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, log_sources=log_sources)
9084
self.tokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping)
9185
return source_mappings
Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Optional
22

3-
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
3+
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
44
from app.translator.platforms.athena.const import athena_query_details
55

66

@@ -22,23 +22,5 @@ def prepare_log_source_signature(self, mapping: dict) -> AthenaLogSourceSignatur
2222
default_log_source = mapping["default_log_source"]
2323
return AthenaLogSourceSignature(tables=tables, default_source=default_log_source)
2424

25-
def get_suitable_source_mappings(self, field_names: list[str], table: Optional[str]) -> list[SourceMapping]:
26-
suitable_source_mappings = []
27-
for source_mapping in self._source_mappings.values():
28-
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
29-
continue
30-
31-
log_source_signature: AthenaLogSourceSignature = source_mapping.log_source_signature
32-
if table and log_source_signature.is_suitable(table=table):
33-
if source_mapping.fields_mapping.is_suitable(field_names):
34-
suitable_source_mappings.append(source_mapping)
35-
elif source_mapping.fields_mapping.is_suitable(field_names):
36-
suitable_source_mappings.append(source_mapping)
37-
38-
if not suitable_source_mappings:
39-
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
40-
41-
return suitable_source_mappings
42-
4325

4426
athena_query_mappings = AthenaMappings(platform_dir="athena", platform_details=athena_query_details)
Lines changed: 11 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Optional
22

3-
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
3+
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
44

55

66
class AQLLogSourceSignature(LogSourceSignature):
@@ -20,23 +20,18 @@ def __init__(
2020

2121
def is_suitable(
2222
self,
23-
devicetype: Optional[list[int]],
24-
category: Optional[list[int]],
25-
qid: Optional[list[int]],
26-
qideventcategory: Optional[list[int]],
23+
devicetype: Optional[list[int]] = None,
24+
category: Optional[list[int]] = None,
25+
qid: Optional[list[int]] = None,
26+
qideventcategory: Optional[list[int]] = None,
2727
) -> bool:
28-
device_type_match = set(devicetype).issubset(self.device_types) if devicetype else None
29-
category_match = set(category).issubset(self.categories) if category else None
30-
qid_match = set(qid).issubset(self.qids) if qid else None
31-
qid_event_category_match = (
32-
set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None
33-
)
34-
all_conditions = [
35-
condition
36-
for condition in (device_type_match, category_match, qid_match, qid_event_category_match)
37-
if condition is not None
28+
conditions = [
29+
set(devicetype).issubset(self.device_types) if devicetype else None,
30+
set(category).issubset(self.categories) if category else None,
31+
set(qid).issubset(self.qids) if qid else None,
32+
set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None,
3833
]
39-
return bool(all_conditions) and all(all_conditions)
34+
return self._check_conditions(conditions)
4035

4136
def __str__(self) -> str:
4237
return self._default_source.get("table", "events")
@@ -61,33 +56,3 @@ def prepare_log_source_signature(self, mapping: dict) -> AQLLogSourceSignature:
6156
qid_event_categories=log_source.get("qideventcategory"),
6257
default_source=default_log_source,
6358
)
64-
65-
def get_suitable_source_mappings(
66-
self,
67-
field_names: list[str],
68-
devicetype: Optional[list[int]] = None,
69-
category: Optional[list[int]] = None,
70-
qid: Optional[list[int]] = None,
71-
qideventcategory: Optional[list[int]] = None,
72-
) -> list[SourceMapping]:
73-
suitable_source_mappings = []
74-
for source_mapping in self._source_mappings.values():
75-
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
76-
continue
77-
78-
log_source_signature: AQLLogSourceSignature = source_mapping.log_source_signature
79-
if log_source_signature.is_suitable(devicetype, category, qid, qideventcategory): # noqa: SIM102
80-
if source_mapping.fields_mapping.is_suitable(field_names):
81-
suitable_source_mappings.append(source_mapping)
82-
83-
if not suitable_source_mappings:
84-
for source_mapping in self._source_mappings.values():
85-
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
86-
continue
87-
if source_mapping.fields_mapping.is_suitable(field_names):
88-
suitable_source_mappings.append(source_mapping)
89-
90-
if not suitable_source_mappings:
91-
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
92-
93-
return suitable_source_mappings
Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
from typing import Optional
22

3-
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
3+
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
44

55

66
class LuceneLogSourceSignature(LogSourceSignature):
77
def __init__(self, indices: Optional[list[str]], default_source: dict):
88
self.indices = set(indices or [])
99
self._default_source = default_source or {}
1010

11-
def is_suitable(self, index: Optional[list[str]]) -> bool:
12-
return set(index or []).issubset(self.indices)
11+
def is_suitable(self, index: Optional[list[str]] = None, **kwargs) -> bool: # noqa: ARG002
12+
return self._check_conditions([set(index).issubset(self.indices) if index else None])
1313

1414
def __str__(self) -> str:
1515
return self._default_source.get("index", "")
@@ -20,26 +20,3 @@ def prepare_log_source_signature(self, mapping: dict) -> LuceneLogSourceSignatur
2020
indices = mapping.get("log_source", {}).get("index")
2121
default_log_source = mapping.get("default_log_source", {})
2222
return LuceneLogSourceSignature(indices=indices, default_source=default_log_source)
23-
24-
def get_suitable_source_mappings(
25-
self,
26-
field_names: list[str],
27-
index: Optional[list[str]] = None,
28-
**kwargs, # noqa: ARG002
29-
) -> list[SourceMapping]:
30-
suitable_source_mappings = []
31-
for source_mapping in self._source_mappings.values():
32-
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
33-
continue
34-
35-
log_source_signature: LuceneLogSourceSignature = source_mapping.log_source_signature
36-
if index and log_source_signature.is_suitable(index=index):
37-
if source_mapping.fields_mapping.is_suitable(field_names):
38-
suitable_source_mappings.append(source_mapping)
39-
elif source_mapping.fields_mapping.is_suitable(field_names):
40-
suitable_source_mappings.append(source_mapping)
41-
42-
if not suitable_source_mappings:
43-
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
44-
45-
return suitable_source_mappings

uncoder-core/app/translator/platforms/base/sql/parsers/sql.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
"""
1818

1919
import re
20-
from typing import Optional
2120

2221
from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer
2322
from app.translator.core.parser import PlatformQueryParser
@@ -31,12 +30,12 @@ class SqlQueryParser(PlatformQueryParser):
3130

3231
wrapped_with_comment_pattern = r"^\s*--.*(?:\n|$)"
3332

34-
def _parse_query(self, query: str) -> tuple[str, dict[str, Optional[str]]]:
35-
log_source = {"table": None}
33+
def _parse_query(self, query: str) -> tuple[str, dict[str, list[str]]]:
34+
log_source = {"table": []}
3635
if re.search(self.query_delimiter_pattern, query, flags=re.IGNORECASE):
3736
table_search = re.search(self.table_pattern, query)
3837
table = table_search.group("table")
39-
log_source["table"] = table
38+
log_source["table"] = [table]
4039
return re.split(self.query_delimiter_pattern, query, flags=re.IGNORECASE)[1], log_source
4140

4241
return query, log_source
Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
1+
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
22
from app.translator.platforms.chronicle.const import chronicle_query_details, chronicle_rule_details
33

44

55
class ChronicleLogSourceSignature(LogSourceSignature):
66
def is_suitable(self) -> bool:
7-
raise NotImplementedError
7+
raise True
88

99
def __str__(self) -> str:
1010
return ""
@@ -16,20 +16,6 @@ class ChronicleMappings(BasePlatformMappings):
1616
def prepare_log_source_signature(self, mapping: dict) -> ChronicleLogSourceSignature:
1717
...
1818

19-
def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]:
20-
suitable_source_mappings = []
21-
for source_mapping in self._source_mappings.values():
22-
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
23-
continue
24-
25-
if source_mapping.fields_mapping.is_suitable(field_names):
26-
suitable_source_mappings.append(source_mapping)
27-
28-
if not suitable_source_mappings:
29-
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
30-
31-
return suitable_source_mappings
32-
3319

3420
chronicle_query_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_query_details)
3521
chronicle_rule_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_rule_details)
Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Optional
22

3-
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
3+
from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
44
from app.translator.platforms.crowdstrike.const import crowdstrike_query_details
55

66

@@ -9,8 +9,9 @@ def __init__(self, event_simple_name: Optional[list[str]], default_source: dict)
99
self.event_simple_names = set(event_simple_name or [])
1010
self._default_source = default_source or {}
1111

12-
def is_suitable(self, event_simple_name: list[str]) -> bool:
13-
return set(event_simple_name).issubset(self.event_simple_names)
12+
def is_suitable(self, event_simple_name: Optional[list[str]] = None) -> bool:
13+
conditions = [set(event_simple_name).issubset(self.event_simple_names) if event_simple_name else None]
14+
return self._check_conditions(conditions)
1415

1516
def __str__(self) -> str:
1617
return f"event_simpleName={self._default_source['event_simpleName']}"
@@ -24,19 +25,5 @@ def prepare_log_source_signature(self, mapping: dict) -> CrowdStrikeLogSourceSig
2425
event_simple_name=log_source.get("event_simpleName"), default_source=default_log_source
2526
)
2627

27-
def get_suitable_source_mappings(self, field_names: list[str], event_simpleName: list[str]) -> list[SourceMapping]: # noqa: N803
28-
suitable_source_mappings = []
29-
for source_mapping in self._source_mappings.values():
30-
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
31-
continue
32-
33-
source_signature: CrowdStrikeLogSourceSignature = source_mapping.log_source_signature
34-
if source_signature.is_suitable(
35-
event_simple_name=event_simpleName
36-
) and source_mapping.fields_mapping.is_suitable(field_names):
37-
suitable_source_mappings.append(source_mapping)
38-
39-
return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]]
40-
4128

4229
crowdstrike_query_mappings = CrowdstrikeMappings(platform_dir="crowdstrike", platform_details=crowdstrike_query_details)
Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
from typing import Optional
22

3-
from app.translator.core.mapping import (
4-
DEFAULT_MAPPING_NAME,
5-
BaseCommonPlatformMappings,
6-
LogSourceSignature,
7-
SourceMapping,
8-
)
3+
from app.translator.core.mapping import BaseCommonPlatformMappings, LogSourceSignature
94
from app.translator.platforms.forti_siem.const import forti_siem_rule_details
105

116

@@ -14,8 +9,8 @@ def __init__(self, event_types: Optional[list[str]], default_source: dict):
149
self.event_types = set(event_types or [])
1510
self._default_source = default_source or {}
1611

17-
def is_suitable(self, event_type: str) -> bool:
18-
return event_type in self.event_types
12+
def is_suitable(self, event_type: Optional[list[str]] = None) -> bool:
13+
return self._check_conditions([set(event_type).issubset(self.event_types) if event_type else None])
1914

2015
def __str__(self) -> str:
2116
event_type = self._default_source.get("eventType", "")
@@ -39,23 +34,5 @@ def prepare_log_source_signature(self, mapping: dict) -> FortiSiemLogSourceSigna
3934
default_log_source = mapping["default_log_source"]
4035
return FortiSiemLogSourceSignature(event_types=event_types, default_source=default_log_source)
4136

42-
def get_suitable_source_mappings(self, field_names: list[str], event_type: Optional[str]) -> list[SourceMapping]:
43-
suitable_source_mappings = []
44-
for source_mapping in self._source_mappings.values():
45-
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
46-
continue
47-
48-
log_source_signature: FortiSiemLogSourceSignature = source_mapping.log_source_signature
49-
if event_type and log_source_signature.is_suitable(event_type=event_type):
50-
if source_mapping.fields_mapping.is_suitable(field_names):
51-
suitable_source_mappings.append(source_mapping)
52-
elif source_mapping.fields_mapping.is_suitable(field_names):
53-
suitable_source_mappings.append(source_mapping)
54-
55-
if not suitable_source_mappings:
56-
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
57-
58-
return suitable_source_mappings
59-
6037

6138
forti_siem_rule_mappings = FortiSiemMappings(platform_dir="forti_siem", platform_details=forti_siem_rule_details)

0 commit comments

Comments
 (0)