Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions minecode_pipelines/miners/npm.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ def get_npm_packageurls(name, npm_repo=NPM_REGISTRY_REPO):
return packageurls


def yield_npm_package_data(name, packageurls=[]):
for purl in packageurls or get_npm_packageurls(name):
package_url = PackageURL.from_string(purl)
package_data_url = NPM_REGISTRY_REPO + name + "/" + package_url.version
response = requests.get(package_data_url)
if not response.ok:
continue
yield purl, response.json()


def load_npm_packages(packages_file):
with open(packages_file) as f:
packages_data = json.load(f)
Expand Down
32 changes: 26 additions & 6 deletions minecode_pipelines/miners/pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
pypi_json_headers = {"Accept": "application/vnd.pypi.simple.v1+json"}


PYPI_REPO = "https://pypi.org/simple/"
PYPI_SIMPLE_REPO = "https://pypi.org/simple"
PYPI_METADATA_REPO = "https://pypi.org/pypi"
PYPI_TYPE = "pypi"


Expand All @@ -49,16 +50,23 @@ def get_pypi_packages(pypi_repo, logger=None):
return response.json()


def get_pypi_packageurls(name):
packageurls = []
def get_pypi_package_versions(name):
versions = []

project_index_api_url = PYPI_REPO + name
project_index_api_url = PYPI_SIMPLE_REPO + "/" + name
response = requests.get(project_index_api_url, headers=pypi_json_headers)
if not response.ok:
return packageurls
return versions

project_data = response.json()
for version in project_data.get("versions"):
versions = project_data.get("versions", [])
return versions


def get_pypi_packageurls(name):
packageurls = []

for version in get_pypi_package_versions(name=name):
purl = PackageURL(
type=PYPI_TYPE,
name=name,
Expand All @@ -69,6 +77,18 @@ def get_pypi_packageurls(name):
return packageurls


def yield_pypi_package_data(name, packageurls=[]):
for purl in packageurls or get_pypi_packageurls(name):
package_url = PackageURL.from_string(purl)
package_data_url = (
PYPI_METADATA_REPO + "/" + name + "/" + package_url.version + "/" + "json"
)
response = requests.get(package_data_url, headers=pypi_json_headers)
if not response.ok:
continue
yield purl, response.json()


def load_pypi_packages(packages_file):
with open(packages_file) as f:
packages_data = json.load(f)
Expand Down
183 changes: 149 additions & 34 deletions minecode_pipelines/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from collections.abc import Iterable
from pathlib import Path

from aboutcode.federated import DataCluster
from aboutcode.federated import DataFederation
from aboutcode.pipeline import LoopProgress
from packageurl import PackageURL
from scanpipe.pipelines import Pipeline
from scanpipe.pipes import federatedcode

from minecode_pipelines import pipes
from minecode_pipelines.pipes import write_package_data_to_file
from minecode_pipelines.pipes import write_packageurls_to_file

module_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,16 +89,22 @@ def fetch_federation_config(self):
data_federation = DataFederation.from_url(
name="aboutcode-data",
remote_root_url="https://github.com/aboutcode-data",
branch="add-datafile_name",
)
self.data_cluster = data_federation.get_cluster("purls")
self.data_clusters = {
"purls": data_federation.get_cluster("purls"),
"api_package_version_responses": data_federation.get_cluster(
"api_package_version_responses"
),
}

def mine_and_publish_packageurls(self):
"""Mine and publish PackageURLs."""

_mine_and_publish_packageurls(
packageurls=self.mine_packageurls(),
total_package_count=self.packages_count(),
data_cluster=self.data_cluster,
data_clusters=self.data_clusters,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
append_purls=self.append_purls,
Expand Down Expand Up @@ -141,16 +148,75 @@ def log(self, message, level=logging.INFO):
self.append_to_log(message)


def commit_and_push_packageurls(
current_working_repos,
commit_msg_func,
checkpoint_func,
checkpoint_on_commit,
checkpoint_interval,
last_checkpoint_call,
logger,
):
"""
Given a list of `current_working_repos`, commit and push changes to each repo with the commit message returned from `commit_msg_func`.

If `checkpoint_on_commit` is True and `checkpoint_func` exists, then we execute `checkpoint_func`.

If `checkpoint_on_commit` is False, then we determine if it is time to call `checkpoint_func` or not.
"""

if logger:
logger("Trying to commit PackageURLs.")

for repo_checkout in current_working_repos:
pipes.commit_and_push_checkout(
local_checkout=repo_checkout,
commit_message=commit_msg_func(repo_checkout["commit_count"] + 1),
logger=logger,
)

if checkpoint_on_commit and checkpoint_func:
checkpoint_func()

if not checkpoint_on_commit:
time_now = time.time()
checkpoint_due = time_now - last_checkpoint_call >= checkpoint_interval
if checkpoint_func and checkpoint_due:
checkpoint_func()
last_checkpoint_call = time_now


def get_repo_checkout_from_data_cluster(
data_cluster, purl, checked_out_repos, working_path, logger, datafile_name=None
):
"""
Return a `repo_checkout` and `datafile_path` for a given `purl`, `data_cluster`, and `working_path`.

Add `repo_checkout` to `checked_out_repos`.
"""
repo, datafile_path = data_cluster.get_datafile_repo_and_path(
purl=purl, datafile_name=datafile_name
)
if repo not in checked_out_repos:
checked_out_repos[repo] = pipes.init_local_checkout(
repo_name=repo,
working_path=working_path,
logger=logger,
)
repo_checkout = checked_out_repos[repo]
return repo_checkout, datafile_path


def _mine_and_publish_packageurls(
packageurls: Iterable,
total_package_count: int,
data_cluster: DataCluster,
data_clusters,
checked_out_repos: dict,
working_path: Path,
append_purls: bool,
commit_msg_func: Callable,
logger: Callable,
batch_size: int = 4000,
batch_size: int = 100,
checkpoint_on_commit: bool = False,
checkpoint_func: Callable = None,
checkpoint_freq: int = 30,
Expand All @@ -172,45 +238,94 @@ def _mine_and_publish_packageurls(
iterator = progress.iter(iterator)
logger(f"Mine PackageURL for {total_package_count:,d} packages.")

for base, purls in iterator:
purls_data_cluster = data_clusters["purls"]
api_package_version_responses_data_cluster = data_clusters["api_package_version_responses"]

current_working_repos = []
currently_processed_files_count = 0
for base, purls, purls_and_package_data in iterator:
if not purls or not base:
continue

package_repo, datafile_path = data_cluster.get_datafile_repo_and_path(purl=base)
if package_repo not in checked_out_repos:
checked_out_repos[package_repo] = pipes.init_local_checkout(
repo_name=package_repo,
working_path=working_path,
logger=logger,
)
purls_package_repo_checkout, purls_datafile_path = get_repo_checkout_from_data_cluster(
data_cluster=purls_data_cluster,
purl=base,
checked_out_repos=checked_out_repos,
working_path=working_path,
logger=logger,
)
if purls_package_repo_checkout not in current_working_repos:
current_working_repos.append(purls_package_repo_checkout)

checkout = checked_out_repos[package_repo]
purl_file = write_packageurls_to_file(
repo=checkout["repo"],
relative_datafile_path=datafile_path,
repo=purls_package_repo_checkout["repo"],
relative_datafile_path=purls_datafile_path,
packageurls=purls,
append=append_purls,
)
checkout["file_to_commit"].add(purl_file)
checkout["file_processed_count"] += 1

if len(checkout["file_to_commit"]) > batch_size:
if logger:
logger("Trying to commit PackageURLs.")
pipes.commit_and_push_checkout(
local_checkout=checkout,
commit_message=commit_msg_func(checkout["commit_count"] + 1),
purls_package_repo_checkout["file_to_commit"].add(purl_file)
purls_package_repo_checkout["file_processed_count"] += 1
currently_processed_files_count += 1

if currently_processed_files_count > batch_size:
commit_and_push_packageurls(
current_working_repos=current_working_repos,
commit_msg_func=commit_msg_func,
checkpoint_func=checkpoint_func,
checkpoint_on_commit=checkpoint_on_commit,
checkpoint_interval=checkpoint_interval,
last_checkpoint_call=last_checkpoint_call,
logger=logger,
)
if checkpoint_on_commit and checkpoint_func:
checkpoint_func()

if not checkpoint_on_commit:
time_now = time.time()
checkpoint_due = time_now - last_checkpoint_call >= checkpoint_interval
if checkpoint_func and checkpoint_due:
checkpoint_func()
last_checkpoint_call = time_now
current_working_repos = []
currently_processed_files_count = 0

for purl, api_package_version_response in purls_and_package_data:
if not isinstance(purl, PackageURL):
package_url = PackageURL.from_string(purl)
else:
package_url = purl
if package_url.type == "maven":
datafile_name = "pom.xml"
else:
datafile_name = "api_package_version_response.json"
api_package_version_responses_repo_checkout, api_package_metadata_datafile_path = (
get_repo_checkout_from_data_cluster(
data_cluster=api_package_version_responses_data_cluster,
purl=purl,
checked_out_repos=checked_out_repos,
working_path=working_path,
logger=logger,
datafile_name=datafile_name,
)
)
if api_package_version_responses_repo_checkout not in current_working_repos:
current_working_repos.append(api_package_version_responses_repo_checkout)

api_package_version_response_file = write_package_data_to_file(
repo=api_package_version_responses_repo_checkout["repo"],
relative_api_package_metadata_datafile_path=api_package_metadata_datafile_path,
package_data=api_package_version_response,
)

api_package_version_responses_repo_checkout["file_to_commit"].add(
api_package_version_response_file
)
api_package_version_responses_repo_checkout["file_processed_count"] += 1
currently_processed_files_count += 1

if currently_processed_files_count > batch_size:
commit_and_push_packageurls(
current_working_repos=current_working_repos,
commit_msg_func=commit_msg_func,
checkpoint_func=checkpoint_func,
checkpoint_on_commit=checkpoint_on_commit,
checkpoint_interval=checkpoint_interval,
last_checkpoint_call=last_checkpoint_call,
logger=logger,
)
current_working_repos = []
currently_processed_files_count = 0

for checkout in checked_out_repos.values():
final_commit_count = checkout["commit_count"] + 1
Expand Down
2 changes: 1 addition & 1 deletion minecode_pipelines/pipelines/mine_alpine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def steps(cls):

def mine_and_publish_alpine_packageurls(self):
alpine.mine_and_publish_alpine_packageurls(
data_cluster=self.data_cluster,
data_clusters=self.data_clusters,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
commit_msg_func=self.commit_message,
Expand Down
2 changes: 1 addition & 1 deletion minecode_pipelines/pipelines/mine_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def mine_and_publish_packageurls(self):
_mine_and_publish_packageurls(
packageurls=self.mine_packageurls(),
total_package_count=self.packages_count(),
data_cluster=self.data_cluster,
data_clusters=self.data_clusters,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
append_purls=self.append_purls,
Expand Down
6 changes: 3 additions & 3 deletions minecode_pipelines/pipelines/mine_debian.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def steps(cls):
cls.create_federatedcode_working_dir,
cls.fetch_federation_config,
cls.fetch_checkpoint_and_debian_index,
cls.mine_and_publish_alpine_packageurls,
cls.mine_and_publish_debian_packageurls,
cls.save_check_point,
cls.delete_working_dir,
)
Expand All @@ -62,13 +62,13 @@ def fetch_checkpoint_and_debian_index(self):
self.log(f"last_checkpoint: {self.last_checkpoint}")
self.debian_collector = debian.DebianCollector(logger=self.log)

def mine_and_publish_alpine_packageurls(self):
def mine_and_publish_debian_packageurls(self):
_mine_and_publish_packageurls(
packageurls=self.debian_collector.get_packages(
previous_index_last_modified_date=self.last_checkpoint,
),
total_package_count=None,
data_cluster=self.data_cluster,
data_clusters=self.data_clusters,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
append_purls=self.append_purls,
Expand Down
2 changes: 1 addition & 1 deletion minecode_pipelines/pipelines/mine_npm.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def mine_and_publish_packageurls(self):
_mine_and_publish_packageurls(
packageurls=self.mine_packageurls(),
total_package_count=self.packages_count(),
data_cluster=self.data_cluster,
data_clusters=self.data_clusters,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
append_purls=self.append_purls,
Expand Down
2 changes: 1 addition & 1 deletion minecode_pipelines/pipelines/mine_nuget.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ def mine_packageurls(self):
base_purl=base,
versions=versions,
)
yield base, packageurls
yield base, packageurls, []
2 changes: 1 addition & 1 deletion minecode_pipelines/pipelines/mine_pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def mine_and_publish_packageurls(self):
_mine_and_publish_packageurls(
packageurls=self.mine_packageurls(),
total_package_count=self.packages_count(),
data_cluster=self.data_cluster,
data_clusters=self.data_clusters,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
append_purls=self.append_purls,
Expand Down
2 changes: 1 addition & 1 deletion minecode_pipelines/pipelines/mine_swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def steps(cls):
)

def clone_swift_index(self):
"""Clone the Cargo index Repo."""
"""Clone the Swift index Repo."""
self.swift_index_repo = federatedcode.clone_repository(
repo_url=self.swift_index_repo_url,
clone_path=self.working_path / "swift-index",
Expand Down
Loading