Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions blog/helpers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import enum

GIT_REPO = 'https://github.com/superlinked/VectorHub'
GIT_REPO = "https://github.com/superlinked/VectorHub"


class ItemType(enum.Enum):
FOLDER = "folder"
FILE = "file"


class Item:
def __init__(self, type, name, path, has_blogs=False, children=None):
self.type = type
Expand Down Expand Up @@ -36,7 +38,7 @@ def from_dict(cls, data):
name=data.get("name", ""),
path=data.get("path", ""),
has_blogs=data.get("has_blogs", False),
children=data.get("children", [])
children=data.get("children", []),
)

def to_dict(self):
Expand All @@ -62,27 +64,38 @@ def __init__(self, content, filepath, last_updated):
self.title = self.get_title()

def get_title(self) -> str:
lines = self.content.split('\n')
lines = self.content.split("\n")
first_line = str(lines[0]).strip()
if first_line.startswith('# '):
self.content = '\n'.join(lines[1:])
if first_line.startswith("# "):
self.content = "\n".join(lines[1:])
self.content = self.content.strip()
return first_line.replace('# ', '').strip()
return first_line.replace("# ", "").strip()
else:
return os.path.basename(self.filepath).replace('-', ' ').replace('_', ' ').replace('.md', '')
return (
os.path.basename(self.filepath)
.replace("-", " ")
.replace("_", " ")
.replace(".md", "")
)

def __str__(self) -> str:
return self.title

def get_github_url(self):
return f'{GIT_REPO}/blob/main/{self.filepath}'
return f"{GIT_REPO}/blob/main/{self.filepath}"

def get_filepath(self):
return self.filepath.replace('&', '').replace('--', '-').replace('__', '_')
return self.filepath.replace("&", "").replace("--", "-").replace("__", "_")

def get_slug(self):
if not self.slug_url:
slug = self.get_filepath().replace('.md', '').replace('_', '-').replace(' ', '-').replace('docs/', '')
slug = (
self.get_filepath()
.replace(".md", "")
.replace("_", "-")
.replace(" ", "-")
.replace("docs/", "")
)
self.slug_url = slug.lower()
return self.slug_url

Expand All @@ -94,14 +107,15 @@ def set_slug_url(self, slug_url):

def get_json(self):
return {
"github_url": self.get_github_url(),
"content": self.content,
"github_last_updated_date": self.last_updated,
"title": self.title,
"slug_url": self.get_slug(),
"publishedAt": self.publishedAt,
"filepath": self.get_filepath()
}
"github_url": self.get_github_url(),
"content": self.content,
"github_last_updated_date": self.last_updated,
"title": self.title,
"slug_url": self.get_slug(),
"publishedAt": self.publishedAt,
"filepath": self.get_filepath(),
"meta_desc": self.meta_desc,
}

def get_post_json(self, is_draft=False):
return {"data": self.get_json()}
Expand Down
171 changes: 96 additions & 75 deletions blog/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,77 @@

args = None

BASE_URL = os.getenv('STRAPI_URL', "")
API_KEY = os.getenv('STRAPI_API_KEY', "")
BASE_URL = os.getenv("STRAPI_URL", "")
API_KEY = os.getenv("STRAPI_API_KEY", "")

paths_to_search = []
existing_filepaths_discovered = {}

headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}


def arg_parse():
global args
parser = argparse.ArgumentParser(description="VectorHub Strapi Upload")
parser.add_argument('--directories', help='Path to json which describes the directories to parse')
parser.add_argument(
"--directories", help="Path to json which describes the directories to parse"
)
args = parser.parse_args()


def load_items_from_json(directories: str) -> list:
if os.path.exists(directories):
items = []
try:
with open(directories, 'r') as file:
with open(directories, "r") as file:
data = json.load(file)
for item_data in data:
items.append(Item.from_dict(item_data))
except JSONDecodeError as e:
print('JSON Structure is invalid.')
return [Item.from_dict(item_data) for item_data in data]
except JSONDecodeError:
print("❌ Invalid JSON structure.")
exit(1)
except Exception as e:
print('Unknown error occured.')
print("❌ Unknown error while reading directory JSON:")
print(e)
exit(1)
return items
else:
print(f"{directories} does not exist.")
exit(1)


def load_existing_blogs(page_num=1):
"""Loads all blogs currently in Strapi."""
global existing_filepaths_discovered
base_url = urljoin(BASE_URL, 'api/blogs')
search_url = base_url + f"?pagination[page]={page_num}&publicationState=preview"

session = requests.Session()
base_url = urljoin(BASE_URL, "api/blogs")
search_url = f"{base_url}?pagination[page]={page_num}&pagination[pageSize]=100"

session = requests.Session()
response = session.get(search_url, headers=headers)

if response.status_code == 200:
data = json.loads(response.text)['data']
if len(data) > 0:
for item in data:
existing_filepaths_discovered[item['attributes']['filepath']] = {'discovered': False, 'id': item['id']}
load_existing_blogs(page_num+1)
data = response.json().get("data", [])
if not data:
return
for item in data:
filepath = item.get("filepath")
if filepath:
existing_filepaths_discovered[filepath] = {
"discovered": False,
"id": item["id"],
}
load_existing_blogs(page_num + 1)
else:
print(f"⚠️ Failed to load blogs: {response.status_code} {response.text}")


def fetch_paths(node: Item, current_path=""):
"""Recursively collect directories containing blogs."""
global paths_to_search
# Update the current path with dthe node's path

current_path = f"{current_path}/{node.path}" if current_path else node.path

# If the node has children, recurse on each child
if node.has_blogs:
paths_to_search.append(current_path)
if node.children and len(node.children) > 0:
if node.children:
for child in node.children:
fetch_paths(child, current_path)

Expand All @@ -85,78 +93,91 @@ def find_files_to_upload(items: list):
fetch_paths(item)

files = []
extension = "md"

extension = 'md'

for path in paths_to_search:
folder_path = Path(path)
folder_files = folder_path.glob(f"*.{extension}")
for file in folder_files:
if 'readme.md' not in str(file).lower():
files.append({
'path': str(file),
'time': datetime.fromtimestamp(os.path.getmtime(file)).strftime("%Y-%m-%d")
})

for file in folder_path.glob(f"*.{extension}"):
if "readme.md" not in str(file).lower():
files.append(
{
"path": str(file),
"time": datetime.fromtimestamp(os.path.getmtime(file)).strftime(
"%Y-%m-%d"
),
}
)
return files


def build_blog_object(file_obj: dict) -> StrapiBlog:
filepath = file_obj['path']
with open(filepath, 'r') as file:
filepath = file_obj["path"]
with open(filepath, "r") as file:
content = file.read()
blog = StrapiBlog(content, filepath, file_obj['time'])
return blog
return StrapiBlog(content, filepath, file_obj["time"])


def upload_blog(blog: StrapiBlog):
base_url = urljoin(BASE_URL, 'api/blogs')
"""Uploads or updates a blog to Strapi v5."""
base_url = urljoin(BASE_URL, "api/blogs")
filepath = blog.get_filepath()
search_url = base_url + f"?filters[filepath][$eqi]={filepath}&publicationState=preview"
search_url = f"{base_url}?filters[filepath][$eqi]={filepath}"

session = requests.Session()

if filepath in existing_filepaths_discovered:
existing_filepaths_discovered[filepath]['discovered'] = True
existing_filepaths_discovered[filepath]["discovered"] = True

response = session.get(search_url, headers=headers)
if response.status_code != 200:
print(f"❌ Error fetching blog {filepath}: {response.text}")
return

existing = response.json().get("data", [])
print(f"📤 Uploading filepath: {filepath}")

if existing:
# Blog already exists
blog_id = existing[0]["documentId"]
blog.set_slug_url(existing[0].get("slug_url"))
blog.set_published_at(existing[0].get("publishedAt"))
meta_desc = existing[0].get("meta_desc")
if meta_desc:
blog.meta_desc = meta_desc
else:
blog.meta_desc = blog.title

if response.status_code == 200:
responses = json.loads(response.text)['data']
print(f'Uploading filepath: {blog.get_filepath()}')
if len(responses) > 0:
# Blog already exists at this filepath
id = json.loads(response.text)['data'][0]['id']

blog.set_slug_url(json.loads(response.text)['data'][0]['attributes']['slug_url'])
blog.set_published_at(json.loads(response.text)['data'][0]['attributes']['publishedAt'])
url = f"{base_url}/{blog_id}"
create_response = session.put(
url, headers=headers, data=json.dumps(blog.get_post_json())
)
else:
# New blog
create_response = session.post(
base_url, headers=headers, data=json.dumps(blog.get_post_json())
)

url = f"{base_url}/{id}"
create_response = session.put(url, headers=headers, data=json.dumps(blog.get_post_json()))
else:
# Its a new blog
url = base_url
create_response = session.post(url, headers=headers, data=json.dumps(blog.get_post_json()))
if create_response.status_code not in (200, 201):
print(f"❌ Failed to upload blog: {filepath}", create_response.text)
exit(1)

if not create_response.status_code == 200:
print(f'Error in parsing blog: {filepath}')
print(create_response.text)
exit(1)

def delete_old_blogs():
global existing_filepaths_discovered, BASE_URL
"""Deletes blogs that were not re-uploaded."""
global existing_filepaths_discovered

base_url = urljoin(BASE_URL, 'api/blogs')
base_url = urljoin(BASE_URL, "api/blogs")
session = requests.Session()

for filepath in existing_filepaths_discovered:
if not existing_filepaths_discovered[filepath]['discovered']:
print(f"Deleting filepath: {filepath}")
id = existing_filepaths_discovered[filepath]['id']
if id > 0:
url = f"{base_url}/{id}"
for filepath, info in existing_filepaths_discovered.items():
if not info["discovered"]:
print(f"🗑️ Deleting filepath: {filepath}")
blog_id = info["id"]
if blog_id:
url = f"{base_url}/{blog_id}"
response = session.delete(url, headers=headers)
if response.status_code != 200:
print(f'Error in deleting blog: {filepath}')
print(response.text)
if response.status_code not in (200, 204):
print(f"⚠️ Error deleting {filepath}: {response.text}")


if __name__ == "__main__":
Expand All @@ -167,10 +188,10 @@ def delete_old_blogs():

files = find_files_to_upload(items)

print('Uploading blogs')
print("📦 Uploading blogs...")
for file in tqdm(files):
blog = build_blog_object(file)
upload_blog(blog)

print('Deleting blogs')
print("🧹 Cleaning up deleted blogs...")
delete_old_blogs()