|
| 1 | +import csv |
| 2 | +import glob |
| 3 | +import os |
| 4 | + |
| 5 | +from dsaps import models |
| 6 | + |
| 7 | + |
| 8 | +def create_file_dict(file_path, file_type): |
| 9 | + """Creates a dict of file IDs and file paths.""" |
| 10 | + if file_path.startswith('http'): |
| 11 | + file_dict = models.build_file_dict_remote(file_path, file_type, {}) |
| 12 | + else: |
| 13 | + files = glob.glob(f'{file_path}/**/*.{file_type}', recursive=True) |
| 14 | + file_dict = {} |
| 15 | + for file in files: |
| 16 | + file_name = os.path.splitext(os.path.basename(file))[0] |
| 17 | + file_dict[file_name] = file |
| 18 | + return file_dict |
| 19 | + |
| 20 | + |
| 21 | +def create_metadata_id_list(metadata_csv): |
| 22 | + """Creates a list of IDs from a metadata CSV""" |
| 23 | + metadata_ids = [] |
| 24 | + with open(metadata_csv) as csvfile: |
| 25 | + reader = csv.DictReader(csvfile) |
| 26 | + for row in reader: |
| 27 | + value = row['file_identifier'] |
| 28 | + metadata_ids.append(value) |
| 29 | + return metadata_ids |
| 30 | + |
| 31 | + |
| 32 | +def match_files_to_metadata(file_dict, metadata_ids): |
| 33 | + """Creates a list of files matched to metadata records.""" |
| 34 | + file_matches = [] |
| 35 | + for file_id, v in file_dict.items(): |
| 36 | + for metadata_id in [m for m in metadata_ids |
| 37 | + if file_id.startswith(m)]: |
| 38 | + file_matches.append(file_id) |
| 39 | + return file_matches |
| 40 | + |
| 41 | + |
| 42 | +def match_metadata_to_files(file_dict, metadata_ids): |
| 43 | + """Creates a list of metadata records matched to files.""" |
| 44 | + metadata_matches = [] |
| 45 | + for metadata_id in metadata_ids: |
| 46 | + for file_id in [f for f in file_dict |
| 47 | + if f.startswith(metadata_id)]: |
| 48 | + metadata_matches.append(metadata_id) |
| 49 | + return metadata_matches |
| 50 | + |
| 51 | + |
| 52 | +def reconcile_files_and_metadata(metadata_csv, output_path, file_path, |
| 53 | + file_type): |
| 54 | + """Runs a reconciliation of files and metadata.""" |
| 55 | + file_dict = create_file_dict(file_path, file_type) |
| 56 | + file_ids = file_dict.keys() |
| 57 | + metadata_ids = create_metadata_id_list(metadata_csv) |
| 58 | + metadata_matches = match_metadata_to_files(file_dict, metadata_ids) |
| 59 | + file_matches = match_files_to_metadata(file_dict, metadata_ids) |
| 60 | + no_files = set(metadata_ids) - set(metadata_matches) |
| 61 | + no_metadata = set(file_ids) - set(file_matches) |
| 62 | + models.create_csv_from_list(no_metadata, f'{output_path}no_metadata') |
| 63 | + models.create_csv_from_list(no_files, f'{output_path}no_files') |
| 64 | + models.create_csv_from_list(metadata_matches, |
| 65 | + f'{output_path}metadata_matches') |
| 66 | + update_metadata_csv(metadata_csv, output_path, metadata_matches) |
| 67 | + |
| 68 | + |
| 69 | +def update_metadata_csv(metadata_csv, output_path, metadata_matches): |
| 70 | + """Creates an updated CSV of metadata records with matching files.""" |
| 71 | + with open(metadata_csv) as csvfile: |
| 72 | + reader = csv.DictReader(csvfile) |
| 73 | + upd_md_file_name = f'updated-{os.path.basename(metadata_csv)}' |
| 74 | + with open(f'{output_path}{upd_md_file_name}', 'w') as updated_csv: |
| 75 | + writer = csv.DictWriter(updated_csv, fieldnames=reader.fieldnames) |
| 76 | + writer.writeheader() |
| 77 | + for row in reader: |
| 78 | + if row['file_identifier'] in metadata_matches: |
| 79 | + writer.writerow(row) |
0 commit comments