Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,099 changes: 1,099 additions & 0 deletions keras/format/data/kotlin_sequence.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion keras/format/data/kotlin_vocab.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"author": "mrco",
"version": "1.0",
"process": "PrepareTrainDataFromASTXml",
"updateDate": "2025-09-20T18:40:58.186300",
"updateDate": "2025-10-13T18:43:34.902845",
"dictionary": {
"NamespaceDeclaration_open": {
"id": 1,
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
43 changes: 43 additions & 0 deletions keras/format/format_xml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import getpass

from sequences import Dictionary, DictionaryOperations
from sequences import Sequence, SequenceOperations
from lstm_formatter import XmlOperations
from lstm_formatter import LSTMFormatter

if __name__ == "__main__":
KOTLIN_VOCAB_FILE = "./data/kotlin_vocab.json"
process = "PrepareTrainDataFromASTXml"
inp_words = 4
units = 96
sequence_operations = SequenceOperations()
dictionary_operations = DictionaryOperations()
xml_operations = XmlOperations()

dictionary = dictionary_operations.load(
filepath = KOTLIN_VOCAB_FILE,
username = getpass.getuser(),
process = process
)

model_filename = f"./data/lstm-kotlin-n{inp_words}_v{dictionary.size()}_u{units}.h1.keras"
sequences = xml_operations.loadSequencesUseCase(
directory="../../generated/kotlin/",
filename="output_tree_Kotlin.xml",
dictionary= dictionary
)
if sequences.is_err():
print(f"Error loading sequences: {sequences.unwrap_err()}")
exit(1)
sequences = sequences.unwrap()
sequences.author = getpass.getuser()
sequences.process = process

formatter = LSTMFormatter(inp_words=inp_words)
if (not formatter.loadModel(model_filename)):
print(f"Error loading model")
exit(1)

# formatter.trainModel(sequences)
# formatter.model.save("./data/lstm-kotlin-n4.h1.keras")

1 change: 1 addition & 0 deletions keras/format/lstm_formatter/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
68 changes: 68 additions & 0 deletions keras/format/lstm_formatter/LSTMFormatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Embedding
from keras import optimizers
from tensorflow.keras.models import load_model
from sequences import Sequence
from sequences import Dictionary
import numpy as np
from tensorflow.keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint

class LSTMFormatter:
def __init__(self, inp_words: int = 4):
self.inp_words = inp_words
self.paddingVec = [0] * (inp_words - 1)
self.filename = ""
self.model = None
self.rms = None

def loadModel(self, filename: str) -> bool:
self.filename = filename
self.rms = optimizers.RMSprop(learning_rate=0.0005)
if os.path.exists(filename):
self.model = load_model(filename)
self.model.compile(optimizer=self.rms, loss='sparse_categorical_crossentropy')
return True
return False


def defineModel(self, units: int, dictionary: Dictionary, filename: str):
self.filename = filename
self.rms = optimizers.RMSprop(learning_rate=0.0005)
if os.path.exists(filename):
self.loadModel(filename)
return
self.model = Sequential()
dictionary_size = dictionary.size() + 1 # +1 for padding token
self.model.add(Embedding(dictionary_size,
output_dim=units,
input_length=self.inp_words,
mask_zero=True))
self.model.add(LSTM(units))
self.model.add(Dense(dictionary_size, activation='softmax'))
self.model.build(input_shape=(None, self.inp_words))
self.model.summary()
self.model.compile(optimizer=self.rms, loss='sparse_categorical_crossentropy')

def trainModel(self, sequence: Sequence):
vectors = [it['sequence'] for it in sequence.entries.values()]
vectors = [self.paddingVec + sb for sb in vectors]
X = []
Y = []
for sb in vectors:
for i in range(len(sb) - self.inp_words):
X.append(sb[i:i + self.inp_words])
Y.append(sb[i + self.inp_words])
X = np.array(X)
Y = np.array(Y)
print(f"X shape: {X.shape}, Y shape: {Y.shape}")

checkpoint = ModelCheckpoint(self.filename, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
history = self.model.fit(x = X,
y = Y,
batch_size=16,
validation_split = 0.2,
callbacks=[checkpoint],
epochs=4096)
Copy link

Copilot AI Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded value of 4096 epochs is a magic number that should be configurable. Consider making this a parameter or class attribute to improve maintainability.

Copilot uses AI. Check for mistakes.

120 changes: 120 additions & 0 deletions keras/format/lstm_formatter/XmlOperations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from lxml import etree
from sequences import Sequence
from sequences import Dictionary
import copy
from datetime import datetime
from result import Result, Err, Ok

class XmlOperations:
OPEN_ONLY_TAGS = [
"WorkingDirectory",
"PackageDirectory",
"VariableName",
"CommentLeaf",
"AstTypeLeaf",
"ImportLeaf",
"Space",
"NlSeparator",
"Indent",
"Keyword"
]
SKIP_TAGS = [
"FileMetaInformation"
]

def __init__(self):
pass

def process_childs(self, elem, vocab, id):
for child in elem:
if child.tag in XmlOperations.SKIP_TAGS:
## Skip this tag and its children
continue
tagName = child.tag
tagCanBeClosed = tagName not in XmlOperations.OPEN_ONLY_TAGS
openTag = f"{tagName}_open" if tagCanBeClosed else f"{tagName}"
if tagName == "Keyword":
openTag = f"{tagName}_{child.attrib['name']}"

if openTag not in vocab:
vocab[openTag] = {"id": id, "priority": 0}
id += 1

if tagCanBeClosed:
closeTag = f"{tagName}_close"
if closeTag not in vocab:
vocab[closeTag] = {"id": id, "priority": 0}
id += 1

id = self.process_childs(child, vocab, id)

return id

def refreshDictionaryUseCase(self, directory, filename, dictionary: Dictionary) -> Dictionary:
tree = etree.parse(directory + "/" + filename)
root = tree.getroot()
newDictionary = copy.deepcopy(dictionary)
id = newDictionary.nextId()
for child in root:
print(f"Child tag: {child.tag}, attributes: {child.attrib}")
id = self.process_childs(child, newDictionary.entries, id)

print(f"Vocabulary size: {len(newDictionary.entries)}")
newDictionary.updateDate = datetime.now().isoformat()
return newDictionary

def prepareTrainingSequencesUseCase(self, directory, filename, dictionary: Dictionary) -> Sequence:
tree = etree.parse(directory + "/" + filename)
root = tree.getroot()
sequences = Sequence(username="", process="")

for child in root:
blockName = child.attrib['name']
# print(f"Child tag: {child.tag}, attributes: {child.attrib['name']}")
sequence = []
self.process_childs_for_sequence(child, dictionary, sequence)
sequences.entries[blockName] = {"sequence": sequence}

print(f"Training sequence length: {len(sequences.entries)}")
return sequences

def process_childs_for_sequence(self, elem, dictionary: Dictionary, sequence: list):
for child in elem:
if child.tag in XmlOperations.SKIP_TAGS:
## Skip this tag and its children
continue
tagName = child.tag
tagCanBeClosed = tagName not in XmlOperations.OPEN_ONLY_TAGS
openTag = f"{tagName}_open" if tagCanBeClosed else f"{tagName}"
if tagName == "Keyword":
openTag = f"{tagName}_{child.attrib['name']}"

if openTag in dictionary.entries:
sequence.append(dictionary.entries[openTag]["id"])

if tagCanBeClosed:
closeTag = f"{tagName}_close"
if closeTag in dictionary.entries:
# Process children first (depth-first)
self.process_childs_for_sequence(child, dictionary, sequence)
sequence.append(dictionary.entries[closeTag]["id"])
else:
raise Exception(f"'{closeTag}' not found in vocabulary")
return sequence

def loadSequencesUseCase(self, directory, filename, dictionary: Dictionary) -> Result[Sequence, str]:
tree = etree.parse(directory + "/" + filename)
root = tree.getroot()
sequences = Sequence(username="", process="")

for child in root:
blockName = child.attrib['name']
sequence = []
try:
self.process_childs_for_sequence(child, dictionary, sequence)
except Exception as e:
return Err(f"Error processing block '{blockName}': {str(e)}")
sequences.entries[blockName] = {"sequence": sequence}

print(f"Sequences count: {len(sequences.entries)}")
return Ok(sequences)
6 changes: 6 additions & 0 deletions keras/format/lstm_formatter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# lstm_formatter/__init__.py

from .XmlOperations import XmlOperations
from .LSTMFormatter import LSTMFormatter

__all__ = ['XmlOperations', 'LSTMFormatter']
8 changes: 8 additions & 0 deletions keras/format/sequences/Dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ def __init__(self, username: str, process: str):
self.process = process
self.updateDate = datetime.now().isoformat()

def size(self):
return len(self.entries)

def nextId(self):
if not self.entries:
return 1
return max(entry["id"] for entry in self.entries.values()) + 1

class DictionaryOperations:
def __init__(self):
self.data = {}
Expand Down
2 changes: 1 addition & 1 deletion keras/format/sequences/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# styxlib/__init__.py
# sequences/__init__.py

from .Dictionary import Dictionary, DictionaryOperations
from .Sequence import Sequence, SequenceOperations
Expand Down
Loading
Loading