-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
231 lines (180 loc) · 6.75 KB
/
utils.py
File metadata and controls
231 lines (180 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import os
import frontmatter
from frontmatter import Post
import logging
import argparse
import jison
import re
DEBUG = False
INT_MAX = int(2**63)
note_extensions = [".md"]
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.FileHandler("app.log")
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
logger.addHandler(handler)
para_folders = ["projects", "areas", "resources", "archive", "zettelkasten"]
# banned_folders = []
banned_folders = ["sam-thomas-second-brain"]
project_name = "RELATED_NOTES_BOT"
bottom_matter_header = f"\n---\n\n**{project_name}**"
def get_all_notes(root_directory):
for root, dirs, files in os.walk(root_directory, followlinks=True):
for file in files:
if any(file.endswith(ext) for ext in note_extensions):
yield os.path.join(root, file)
def read_note(note_path):
"""
Reads the note and returns the content
"""
assert os.path.exists(note_path)
assert os.path.isfile(note_path)
assert note_path.endswith(".md")
try:
with open(note_path, "r") as f:
file_content = f.read()
except Exception as e:
print(f"Error reading file {note_path}")
return None
return file_content
def parse_note(raw_content):
"""
Pares the note to extract the meta data and content
"""
# Extract the meta data
meta_data, content = frontmatter.parse(raw_content)
return meta_data, content
def remove_bottom_matter(content) -> str:
if bottom_matter_header in content:
content = content.split(bottom_matter_header)[0]
return content
def update_bottom_matter(content, data):
if content is None:
return False
content = remove_bottom_matter(content)
content += bottom_matter_header
content += "\n"
for key, value in data.items():
if type(value) == list:
value = ", ".join(value)
content += f"\n{key}: {value}"
if len(data) > 0:
content += "\n"
return content
def loop_through_notes(
root_directory, functions, max_notes=INT_MAX, clear_bottom_matter=False
):
notes = get_all_notes(root_directory)
num_notes_processed = 0
for note in notes:
if num_notes_processed >= max_notes:
return
parent_folders = note.split(root_directory)[1].split("/")[1:][:-1]
if not any([folder in para_folders for folder in parent_folders]):
continue
if any([folder in banned_folders for folder in parent_folders]):
continue
# don't care about anything in archive
if "archive" in parent_folders:
continue
raw_content = read_note(note)
if len(raw_content) == 0:
continue # empyt note
meta_data, content = parse_note(raw_content)
bottom_matter_data = {}
if clear_bottom_matter:
content = remove_bottom_matter(content)
for function in functions:
new_bottom_matter_data, new_content, did_update_content = function(
note, meta_data, content, root_directory
)
if new_bottom_matter_data:
bottom_matter_data.update(new_bottom_matter_data)
if did_update_content:
content = new_content
if bottom_matter_data and bottom_matter_data != {}:
content = update_bottom_matter(content, bottom_matter_data)
try:
with open(note, "w") as f:
if meta_data is {} or len(meta_data) == 0 or meta_data is None:
f.write(content)
else:
f.write(frontmatter.dumps(Post(content, **meta_data)))
except Exception as e:
logger.error(f"Error writing to file {note}: {e}")
# Replace with old content
with open(note, "w") as f:
f.write(raw_content)
num_notes_processed += 1
def process_note(note, root_directory, functions, clear_bottom_matter=False):
raw_content = read_note(note)
if len(raw_content) == 0:
return # empyt note
meta_data, content = parse_note(raw_content)
bottom_matter_data = {}
if clear_bottom_matter:
content = remove_bottom_matter(content)
content_was_updated = False
for function in functions:
new_bottom_matter_data, new_content, did_update_content = function(
note, meta_data, content, root_directory
)
if new_bottom_matter_data:
bottom_matter_data.update(new_bottom_matter_data)
if did_update_content:
content_was_updated = True
content = new_content
if bottom_matter_data and bottom_matter_data != {}:
content = update_bottom_matter(content, bottom_matter_data)
try:
if not content_was_updated and bottom_matter_data == {}:
return
with open(note, "w") as f:
if meta_data is {} or len(meta_data) == 0 or meta_data is None:
f.write(content)
else:
f.write(frontmatter.dumps(Post(content, **meta_data)))
except Exception as e:
logger.error(f"Error writing to file {note}: {e}")
# Replace with old content
with open(note, "w") as f:
f.write(raw_content)
def loop_through_directories(
root_directory, functions, max_notes=INT_MAX, clear_bottom_matter=False
):
notes = get_all_notes(root_directory)
num_notes_processed = 0
for note in notes:
print("Working on note: ", note)
if num_notes_processed >= max_notes:
return
process_note(note, root_directory, functions, clear_bottom_matter)
num_notes_processed += 1
def clean_custom_patterns(text):
# TODO: Thoroughly test these
# Replace: email, phone, youtube link, regular link with [email], [phone], [youtube], [link]
clean_text = re.sub( # email
r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)", "<EMAIL>", text
)
clean_text = re.sub( # phone
r"(\d{3}[-\.\s]??\d{3}[-\.\s]??\d{4}|\(\d{3}\)\s*\d{3}[-\.\s]??\d{4}|\d{3}[-\.\s]??\d{4})",
"<PHONE>",
clean_text,
)
# clean_text = re.sub( # youtube link
# r"(https?:\/\/)?(www\.)?(youtube\.com|youtu\.?be)\/.+", "", clean_text
# )
clean_text = re.sub( # regular link
r"(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)",
"<URL>",
clean_text,
)
return clean_text
def normalize_whitespace(text):
lines = text.split("\n")
normalized_lines = [" ".join(line.split()) for line in lines]
return "\n".join(normalized_lines)
def clean_text_for_embedding_model(text):
return clean_custom_patterns(normalize_whitespace(text))