Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ on:

jobs:
call_ci:
uses: EffectiveRange/ci-workflows/.github/workflows/python-ci.yaml@v5
uses: EffectiveRange/ci-workflows/.github/workflows/python-ci.yaml@latest-python
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"args": [
"--backend=scipy"
],
"console": "integratedTerminal"
}
]
}
39 changes: 18 additions & 21 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
{
"editor.rulers": [
120
],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
},
"black-formatter.args": [
"--skip-string-normalization",
"--line-length",
"120"
],
"python.testing.unittestArgs": [
"-v",
"-s",
"./tests",
"-p",
"*test.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
"python.venvPath": "${workspaceFolder}/.venv",
"python.testing.unittestArgs": [
"-v",
"-s",
"./tests",
"-p",
"*test.py"
],
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"black-formatter.interpreter": [
"${workspaceFolder}/.venv/bin/python3"
],
"black-formatter.args": [
"--config=setup.cfg"
],
"python.analysis.typeCheckingMode": "standard",
"python.testing.pytestArgs": []
}
15 changes: 15 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Create Python venv",
"type": "shell",
"command": "if [ -d /var/chroot/buildroot ];then dpkgdeps -v --arch $(grep TARGET_ARCH /home/crossbuilder/target/target | cut -d'=' -f2 | tr -d \\') .;else dpkgdeps -v .;fi && rm -rf .venv && python3 -m venv --system-site-packages .venv && .venv/bin/pip install -e . && .venv/bin/python3 -m mypy --non-interactive --install-types && .venv/bin/pip install pytest-cov || true",
"group": "build",
"detail": "Creates a Python virtual environment in the .venv folder",
"problemMatcher": [
"$eslint-compact"
]
}
]
}
35 changes: 22 additions & 13 deletions common_utility/configLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
# SPDX-License-Identifier: MIT

import os
import shutil
from configparser import ConfigParser
from pathlib import Path
from typing import Any

from context_logger import get_logger

from common_utility import copy_file

log = get_logger('ConfigLoader')


Expand All @@ -21,29 +22,37 @@ def load(self, arguments: dict[str, Any]) -> dict[str, Any]:

class ConfigLoader(IConfigLoader):

def __init__(self, resource_root: str, default_config: str, config_file_argument: str = 'config_file') -> None:
self._resource_root = resource_root
self._default_config = f'{self._resource_root}/{default_config}'
def __init__(self, default_config_file: Path, config_file_argument: str = 'config_file') -> None:
self._default_config_file = default_config_file
self._config_file_argument = config_file_argument

def load(self, arguments: dict[str, Any]) -> dict[str, Any]:
config_file = Path(arguments[self._config_file_argument])
parser = ConfigParser(interpolation=None)

if not os.path.exists(config_file):
log.info('Loading default configuration file', config_file=self._default_config)
config_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(self._default_config, config_file)
else:
log.info('Using configuration file', config_file=str(config_file))
log.info('Loading default configuration', config_file=str(self._default_config_file))
parser.read(self._default_config_file)

parser = ConfigParser(interpolation=None)
parser.read(config_file)
if config_file := arguments.get(self._config_file_argument):
custom_config_file = Path(config_file)

if os.path.exists(custom_config_file):
log.info('Loading custom configuration', config_file=str(custom_config_file))
parser.read(custom_config_file)
else:
try:
log.info('Creating custom configuration using default', config_file=str(custom_config_file))
copy_file(self._default_config_file, custom_config_file)
except Exception as exception:
log.warn('Failed to create custom configuration file', error=str(exception))

configuration = {}

for section in parser.sections():
configuration.update(dict(parser[section]))

log.info('Loading command line arguments', arguments=arguments)
configuration.update(arguments)

log.info('Configuration loaded', configuration=configuration)

return configuration
83 changes: 30 additions & 53 deletions common_utility/fileDownloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# SPDX-License-Identifier: MIT

import os
from typing import Optional
from pathlib import Path
from typing import Optional, Union
from urllib.parse import urlparse

from context_logger import get_logger
Expand All @@ -16,51 +17,33 @@

class IFileDownloader(object):

def download(
self,
file_url: str,
file_name: Optional[str] = None,
sub_dir: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
skip_if_exists: bool = True,
chunk_size: int = 1000 * 1000,
) -> str:
def download(self, file_url: str, file_name: Optional[str] = None, sub_dir: Optional[Union[str, Path]] = None,
headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True, chunk_size: int = 1000 * 1000
) -> Path:
raise NotImplementedError()

def download_and_copy(
self,
file_url: str,
sub_dirs: list[str],
file_name: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
skip_if_exists: bool = True,
chunk_size: int = 1000 * 1000,
) -> list[str]:
def download_and_copy(self, file_url: str, sub_dirs: list[Union[str, Path]], file_name: Optional[str] = None,
headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True,
chunk_size: int = 1000 * 1000) -> list[Path]:
raise NotImplementedError()


class FileDownloader(IFileDownloader):

def __init__(self, session_provider: ISessionProvider, download_location: str) -> None:
def __init__(self, session_provider: ISessionProvider, download_location: Path) -> None:
self._session_provider = session_provider
self._download_location = download_location

def download(
self,
file_url: str,
file_name: Optional[str] = None,
sub_dir: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
skip_if_exists: bool = True,
chunk_size: int = 1000 * 1000,
) -> str:
def download(self, file_url: str, file_name: Optional[str] = None, sub_dir: Optional[Union[str, Path]] = None,
headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True, chunk_size: int = 1000 * 1000
) -> Path:
if not urlparse(file_url).scheme:
return self._check_local_file(file_url)

file_path = self._get_target_path(file_url, file_name, sub_dir)

if skip_if_exists and os.path.isfile(file_path):
log.info('File already exists, skipping download', file=file_path)
log.info('File already exists, skipping download', file=str(file_path))
return file_path

headers = headers if headers else dict()
Expand All @@ -71,19 +54,13 @@ def download(

self._download_file(response, file_path, chunk_size)

log.info('Downloaded file', file=file_path)
log.info('Downloaded file', file=str(file_path))

return file_path

def download_and_copy(
self,
file_url: str,
sub_dirs: list[str],
file_name: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
skip_if_exists: bool = True,
chunk_size: int = 1000 * 1000,
) -> list[str]:
def download_and_copy(self, file_url: str, sub_dirs: list[Union[str, Path]], file_name: Optional[str] = None,
headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True,
chunk_size: int = 1000 * 1000) -> list[Path]:
if not sub_dirs:
raise ValueError('At least one sub directory must be provided')

Expand All @@ -93,22 +70,22 @@ def download_and_copy(
file_path = self._get_target_path(file_url, file_name, sub_dir)

if skip_if_exists and os.path.isfile(file_path):
log.info('File already exists, skipping copy', file=file_path)
log.info('File already exists, skipping copy', file=str(file_path))
else:
copy_file(downloaded_files[0], file_path)
log.info('Copied downloaded file', file=file_path)
log.info('Copied downloaded file', file=str(file_path))

downloaded_files.append(file_path)

return downloaded_files

def _check_local_file(self, file_url: str) -> str:
def _check_local_file(self, file_url: str) -> Path:
file_path = os.path.abspath(file_url)
if os.path.isfile(file_path):
log.info('Local file path provided, skipping download', file=file_path)
return file_path
log.info('Local file path provided, skipping download', file=str(file_path))
return Path(file_path)
else:
log.error('Local file does not exist', file=file_path)
log.error('Local file does not exist', file=str(file_path))
raise ValueError('Local file does not exist')

def _send_request(self, file_url: str, headers: dict[str, str]) -> Response:
Expand All @@ -121,21 +98,21 @@ def _send_request(self, file_url: str, headers: dict[str, str]) -> Response:

return response

def _get_target_path(self, file_url: str, file_name: Optional[str], sub_dir: Optional[str] = None) -> str:
if not file_name and '/' in file_url:
def _get_target_path(self, file_url: str, file_name: Optional[str],
sub_dir: Optional[Union[str, Path]] = None) -> Path:
if not file_name:
file_name = file_url.split('/')[-1]

download_dir = self._download_location

if sub_dir:
download_dir += f'/{sub_dir}'
download_dir = download_dir / sub_dir
create_directory(download_dir)

return f'{download_dir}/{file_name}'
return download_dir / file_name

def _download_file(self, response: Response, file_path: str, chunk_size: int) -> None:
if not os.path.exists(self._download_location):
os.makedirs(self._download_location)
def _download_file(self, response: Response, file_path: Path, chunk_size: int) -> None:
create_directory(self._download_location)

with open(file_path, 'wb') as asset_file:
for chunk in response.iter_content(chunk_size):
Expand Down
28 changes: 14 additions & 14 deletions common_utility/fileUtility.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,55 @@
import re
import shutil
from os.path import exists
from typing import Any
from pathlib import Path
from typing import Any, Union

from jinja2 import Environment, FileSystemLoader


def create_directory(directory: str) -> None:
def create_directory(directory: Union[str, Path]) -> None:
if not os.path.isdir(directory):
os.makedirs(directory, exist_ok=True)


def delete_directory(directory: str) -> None:
def delete_directory(directory: Union[str, Path]) -> None:
if os.path.isdir(directory):
shutil.rmtree(directory)


def create_file(file_path: str, content: str = '\n') -> None:
def create_file(file_path: Union[str, Path], content: str = '\n') -> None:
create_directory(os.path.dirname(file_path))
with open(file_path, 'w') as f:
f.write(content)


def copy_file(source: str, destination: str) -> None:
def copy_file(source: Union[str, Path], destination: Union[str, Path]) -> None:
create_directory(os.path.dirname(destination))
shutil.copy(source, destination)


def append_file(file_path: str, line: str) -> None:
def append_file(file_path: Union[str, Path], line: str) -> None:
with open(file_path, 'a+') as file:
file.writelines([f'{line}\n'])


def delete_file(file_path: str) -> None:
def delete_file(file_path: Union[str, Path]) -> None:
if exists(file_path):
if os.path.islink(file_path):
os.unlink(file_path)
else:
os.remove(file_path)


def is_file_matches_pattern(file_path: str, pattern: str) -> bool:
def is_file_matches_pattern(file_path: Union[str, Path], pattern: str) -> bool:
if not exists(file_path):
return False

with open(file_path) as file:
return re.search(pattern, file.read(), re.MULTILINE) is not None


def is_file_contains_lines(file: str, expected_lines: list[str]) -> bool:
def is_file_contains_lines(file: Union[str, Path], expected_lines: list[str]) -> bool:
if not exists(file):
return False

Expand All @@ -65,14 +66,13 @@ def is_file_contains_lines(file: str, expected_lines: list[str]) -> bool:
return file_lines_set == expected_lines_set


def render_template_file(resource_root: str, template_file: str, context: dict[str, Any]) -> str:
template_path = f'{resource_root}/{template_file}'
environment = Environment(loader=FileSystemLoader(os.path.dirname(template_path)))
template = environment.get_template(os.path.basename(template_path))
def render_template_file(template_file: Union[str, Path], context: dict[str, Any]) -> str:
environment = Environment(loader=FileSystemLoader(os.path.dirname(template_file)))
template = environment.get_template(os.path.basename(template_file))
return f'{template.render(context)}\n'


def replace_in_file(file_path: str, pattern: str, replacement: str) -> None:
def replace_in_file(file_path: Union[str, Path], pattern: str, replacement: str) -> None:
if not exists(file_path):
return

Expand Down
Loading
Loading