-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquality_check.py
More file actions
2379 lines (1930 loc) · 96.5 KB
/
quality_check.py
File metadata and controls
2379 lines (1930 loc) · 96.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Pre-processing quality checks for raw input files.
Each check is a small function with signature:
check(file_path, **context) -> (passed: bool, message: str)
`run_quality_checks` orchestrates them, writes a log file grouped by section,
and returns True if every check passed.
"""
import codecs
import os
import re
from datetime import datetime
import pandas as pd
from rdkit import Chem
from rdkit import RDLogger
# RDKit normally prints noisy errors on bad SMILES to stderr; we report them
# in the QC log instead, so silence its built-in logger here.
RDLogger.DisableLog("rdApp.*")
# ---------- Configuration ----------
MAX_FILE_SIZE_GB = 10
MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_GB * 1024 ** 3
MIN_BATCH_NUMBER = 0
MAX_BATCH_NUMBER = 10000
# Filename format: asms_<provider>_<batch>_<library>_<date>.csv
# Library names may contain underscores; date (YYYYMMDD) anchors the tail.
FILENAME_RE = re.compile(
r"^asms_(?P<provider>[a-z]+)_(?P<batch>\d{1,5})_(?P<library>.+)_(?P<date>\d{8})\.csv$",
re.IGNORECASE,
)
# Allowed characters in the filename (alphanumeric, underscore, period, hyphen).
FILENAME_ALLOWED_RE = re.compile(r"^[A-Za-z0-9_.\-]+$")
# ---------- Helpers ----------
def _parse_filename(file_path):
"""Returns the regex match object or None."""
return FILENAME_RE.match(os.path.basename(file_path))
def _load_providers(providers_csv_path):
"""Load valid provider acronyms from a CSV with an `acronym` column.
Returns a list of lowercase strings, or None if the file is missing.
"""
if not providers_csv_path or not os.path.exists(providers_csv_path):
return None
try:
df = pd.read_csv(providers_csv_path)
except Exception:
return None
if "acronym" not in df.columns:
return []
return [str(a).strip().lower() for a in df["acronym"].dropna()]
def _load_data_generators(providers_csv_path):
"""Load registered data-generator names from Providers.csv.
Reads the `data_generator_name` column when present. Returns a set of
strings (exact match, no case-folding) or None if the file is missing.
Returns an empty set if the column is absent (older Providers.csv files
that haven't been extended yet).
"""
if not providers_csv_path or not os.path.exists(providers_csv_path):
return None
try:
df = pd.read_csv(providers_csv_path)
except Exception:
return None
if "data_generator_name" not in df.columns:
return set()
return {str(g).strip() for g in df["data_generator_name"].dropna() if str(g).strip()}
def _list_libraries(masterlist_dir):
"""Return library names (filename stems) from MasterLists/, excluding the
MasterList_Information mapping file. None if directory is missing.
"""
if not masterlist_dir or not os.path.isdir(masterlist_dir):
return None
libs = []
for name in os.listdir(masterlist_dir):
if name == "MasterList_Information.xlsx":
continue
stem, ext = os.path.splitext(name)
if ext.lower() in (".xlsx", ".xls", ".csv"):
libs.append(stem)
return libs
def _split_smiles(value):
"""Split a SMILES cell on ';' (the isomer separator) and trim each part.
Empty parts are dropped. Used by SMILES validity and library-membership
checks so a row with `'CC;CCC'` (a pre-Step-4 isomer group) is validated
one component at a time rather than failing as a single malformed SMILES.
"""
if value is None or (isinstance(value, float) and pd.isna(value)):
return []
return [p.strip() for p in str(value).split(";") if p.strip()]
def _load_associated_library_df(input_file_path, masterlist_dir, masterlist_info_path):
"""Resolve the master list file for `input_file_path` via MasterList_Information
and return its full DataFrame, or None on any failure (missing folder,
missing mapping file, mapping row absent, library file absent).
Column existence is checked by the callers since different checks need
different columns.
"""
if not (masterlist_dir and os.path.isdir(masterlist_dir)):
return None
if not (masterlist_info_path and os.path.exists(masterlist_info_path)):
return None
try:
info = pd.read_excel(masterlist_info_path)
except Exception:
return None
if not {"FileName", "MaterListName"}.issubset(info.columns):
return None
file_name = os.path.basename(input_file_path)
match = info.loc[info["FileName"] == file_name, "MaterListName"]
if match.empty:
return None
lib_name = match.values[0]
lib_path = os.path.join(masterlist_dir, f"{lib_name}.xlsx")
if not os.path.exists(lib_path):
return None
try:
return pd.read_excel(lib_path)
except Exception:
return None
def _load_associated_library_smiles(input_file_path, masterlist_dir, masterlist_info_path):
"""Returns a set of SMILES from the associated library, or None."""
lib_df = _load_associated_library_df(input_file_path, masterlist_dir, masterlist_info_path)
if lib_df is None or "SMILES" not in lib_df.columns:
return None
return set(lib_df["SMILES"].dropna().astype(str))
def _load_associated_library_formulas(input_file_path, masterlist_dir, masterlist_info_path):
"""Returns the set of formula strings from the associated library, or None.
The library's formula column is matched case-insensitively (`formula`,
`Formula`, `FORMULA` all work). Values are trimmed.
"""
lib_df = _load_associated_library_df(input_file_path, masterlist_dir, masterlist_info_path)
if lib_df is None:
return None
cols_lower = {c.lower(): c for c in lib_df.columns}
if "formula" not in cols_lower:
return None
return set(lib_df[cols_lower["formula"]].dropna().astype(str).str.strip())
def _load_varchar_columns(meta_csv_path):
"""Return the set of column names declared as VARCHAR in ASMS Meta Data.csv.
Reads row 2 (the type row) and returns every column whose declared type is
`VARCHAR` (case-insensitive). Returns an empty set if the file is missing,
unreadable, or doesn't have a second row. Used by `_load_dataframe` so
pandas doesn't auto-cast numeric-looking string columns (e.g. EXPERIMENT_DATE
values like `20250512`) to integers.
"""
if not meta_csv_path or not os.path.exists(meta_csv_path):
return set()
try:
df = pd.read_csv(meta_csv_path, header=None, nrows=2)
except Exception:
return set()
if df.shape[0] < 2:
return set()
headers = [str(h).strip() for h in df.iloc[0]]
types = [str(t).strip().upper() for t in df.iloc[1]]
return {h for h, t in zip(headers, types) if h and t == "VARCHAR"}
def _load_dataframe(file_path, varchar_columns=None):
"""Read the input CSV once and drop fully duplicate rows.
Column-content checks run against this cleaned DataFrame so that the
duplicates flagged by the row-content check (Check 15) do not skew the
per-column metrics. Returns None if the file cannot be parsed.
`varchar_columns` lists names that should be forced to string dtype on
read — typically every column declared `VARCHAR` in `ASMS Meta Data.csv`.
This stops pandas from auto-casting numeric-looking strings (e.g. dates
like `20250512`) to integers.
"""
varchar_columns = set(varchar_columns or [])
try:
header_only = pd.read_csv(file_path, nrows=0)
except Exception:
return None
dtype_arg = {c: str for c in varchar_columns if c in header_only.columns}
try:
df = pd.read_csv(file_path, dtype=dtype_arg) if dtype_arg else pd.read_csv(file_path)
except Exception:
return None
return df.drop_duplicates(keep="first").reset_index(drop=True)
def _load_meta_columns(meta_csv_path):
"""Load the valid column names from the ASMS Meta Data reference CSV.
The reference file's header row lists the canonical column names; the
second row holds data types. Whitespace is stripped and duplicates
(e.g. an accidental trailing-space variant) are collapsed.
Returns a list of strings, or None if the file is missing/unreadable.
"""
if not meta_csv_path or not os.path.exists(meta_csv_path):
return None
try:
df = pd.read_csv(meta_csv_path, nrows=0)
except Exception:
return None
seen = []
for col in df.columns:
name = str(col).strip()
if name and name not in seen:
seen.append(name)
return seen
# ---------- File-format checks ----------
def check_file_opens(file_path, **_):
"""File can be opened for reading."""
try:
with open(file_path, "rb") as f:
f.read(1)
except OSError as e:
return False, f"could not open file: {e}"
return True, "file opens for reading"
def check_is_csv(file_path, **_):
"""File has a .csv extension."""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".csv":
return True, "extension is '.csv'"
return False, f"expected '.csv', got '{ext or '(no extension)'}'"
def check_file_not_empty(file_path, **_):
"""File size is greater than zero bytes."""
try:
size = os.path.getsize(file_path)
except OSError as e:
return False, f"could not stat file: {e}"
if size == 0:
return False, "file is empty (0 bytes)"
return True, f"file size is {size:,} bytes ({size / (1024 ** 2):.2f} MB)"
def check_file_size_under_limit(file_path, **_):
"""File size is below the configured limit (default 10 GB)."""
try:
size = os.path.getsize(file_path)
except OSError as e:
return False, f"could not stat file: {e}"
if size > MAX_FILE_SIZE_BYTES:
return False, (
f"file size {size / (1024 ** 3):.2f} GB exceeds the "
f"{MAX_FILE_SIZE_GB} GB limit"
)
return True, (
f"file size {size / (1024 ** 3):.4f} GB is within the "
f"{MAX_FILE_SIZE_GB} GB limit"
)
def check_encoding_is_utf8(file_path, chunk_size=1024 * 1024, **_):
"""File contents decode cleanly as UTF-8 (reads the whole file in chunks)."""
decoder = codecs.getincrementaldecoder("utf-8")()
try:
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
decoder.decode(b"", final=True)
break
decoder.decode(chunk, final=False)
except UnicodeDecodeError as e:
return False, f"not UTF-8: {e}"
except OSError as e:
return False, f"read error: {e}"
return True, "decodes as UTF-8"
def check_csv_parseable(file_path, **_):
"""File parses as CSV via pandas; also report rows, columns, and column names."""
try:
df = pd.read_csv(file_path, nrows=5)
except Exception as e:
return False, f"pandas could not parse as CSV: {e}"
n_cols = len(df.columns)
if n_cols < 1:
return False, "parsed but has zero columns"
# Count total data rows (line count minus header). Streamed so it stays
# cheap even for multi-GB files.
try:
with open(file_path, "r", encoding="utf-8") as f:
line_count = sum(1 for _ in f)
rows_msg = f"{max(line_count - 1, 0):,} rows"
except OSError:
rows_msg = "row count unavailable"
col_list = ", ".join(df.columns)
return True, (
f"parsed as CSV with {n_cols} columns and {rows_msg}. "
f"Columns: {col_list}"
)
def check_columns_match_metadata(file_path, meta_columns=None, **_):
"""File's column names match the reference list in ASMS Meta Data.csv."""
if meta_columns is None:
return False, "metadata reference unavailable (ASMS Meta Data.csv not found)"
if not meta_columns:
return False, "metadata reference has no columns"
try:
df = pd.read_csv(file_path, nrows=0)
except Exception as e:
return False, f"could not read file columns: {e}"
file_cols = [str(c).strip() for c in df.columns]
valid = set(meta_columns)
file_set = set(file_cols)
missing = [c for c in meta_columns if c not in file_set]
extra = [c for c in file_cols if c not in valid]
if not missing and not extra:
return True, "columns match ASMS Meta Data.csv reference"
parts = ["columns do not match ASMS Meta Data.csv"]
if missing:
parts.append(f"missing required columns ({len(missing)}): {missing}")
if extra:
parts.append(f"extra columns not in reference ({len(extra)}): {extra}")
return False, "; ".join(parts)
# ---------- Filename-format checks ----------
def check_filename_no_special_chars(file_path, **_):
"""Filename has only alphanumerics, underscores, periods, and hyphens."""
name = os.path.basename(file_path)
if FILENAME_ALLOWED_RE.match(name):
return True, "no special characters or spaces in filename"
bad = sorted(set(c for c in name if not re.match(r"[A-Za-z0-9_.\-]", c)))
return False, f"filename contains disallowed character(s): {bad}"
def check_filename_starts_with_asms(file_path, **_):
"""Filename begins with 'asms_'."""
name = os.path.basename(file_path)
if name.lower().startswith("asms_"):
return True, "filename starts with 'asms_'"
return False, f"filename should start with 'asms_', got '{name[:10]}...'"
def check_filename_overall_format(file_path, **_):
"""Filename matches asms_<provider>_<batch>_<library>_<date>.csv."""
match = _parse_filename(file_path)
if match:
return True, (
f"parsed: provider='{match.group('provider')}', "
f"batch='{match.group('batch')}', "
f"library='{match.group('library')}', "
f"date='{match.group('date')}'"
)
return False, (
"filename does not match 'asms_<provider>_<batchN>_<library>_<YYYYMMDD>.csv'"
)
def check_provider_acronym(file_path, providers=None, **_):
"""Provider acronym in the filename is in the registered list."""
match = _parse_filename(file_path)
if not match:
return False, "filename did not parse; cannot extract provider"
provider = match.group("provider").lower()
if providers is None:
return False, "providers list unavailable (Providers.csv not found)"
if not providers:
return False, "providers list is empty"
if provider in providers:
return True, f"provider '{provider}' is registered"
return False, f"provider '{provider}' not in registered list: {providers}"
def check_batch_number_range(file_path, **_):
"""Batch number in the filename is between MIN_BATCH_NUMBER and MAX_BATCH_NUMBER."""
match = _parse_filename(file_path)
if not match:
return False, "filename did not parse; cannot extract batch number"
batch_str = match.group("batch")
try:
batch_int = int(batch_str)
except ValueError:
return False, f"batch number '{batch_str}' is not an integer"
if MIN_BATCH_NUMBER <= batch_int <= MAX_BATCH_NUMBER:
return True, f"batch number {batch_int} (from '{batch_str}') is in range [{MIN_BATCH_NUMBER}, {MAX_BATCH_NUMBER}]"
return False, (
f"batch number {batch_int} is outside [{MIN_BATCH_NUMBER}, {MAX_BATCH_NUMBER}]"
)
def check_library_name(file_path, libraries=None, **_):
"""Library name in the filename matches a file in MasterLists/."""
match = _parse_filename(file_path)
if not match:
return False, "filename did not parse; cannot extract library name"
library = match.group("library")
if libraries is None:
return False, "libraries list unavailable (MasterLists/ not found)"
if not libraries:
return False, "no registered libraries (MasterLists/ is empty)"
if library in libraries:
return True, f"library '{library}' is registered"
return False, f"library '{library}' not in registered list: {libraries}"
def check_date_valid_and_not_future(file_path, **_):
"""Date in the filename is a valid YYYYMMDD and not in the future."""
match = _parse_filename(file_path)
if not match:
return False, "filename did not parse; cannot extract date"
date_str = match.group("date")
try:
date_obj = datetime.strptime(date_str, "%Y%m%d").date()
except ValueError as e:
return False, f"date '{date_str}' is not a valid YYYYMMDD: {e}"
today = datetime.now().date()
if date_obj > today:
return False, f"date {date_obj.isoformat()} is in the future (today is {today.isoformat()})"
return True, f"date {date_obj.isoformat()} is valid and not in the future"
# ---------- Row content checks ----------
def check_no_duplicate_rows(file_path, output_dir=None, **_):
"""No two rows in the file are identical across all columns.
Emits a WARN (not FAIL) when duplicates exist, because Step 3
(anomaly_selection) will drop them anyway. Also writes
`FullyDuplicate_rows_report.csv` into `output_dir` listing every row that
is part of a duplicate group (all copies, not just the dropped ones).
"""
try:
df = pd.read_csv(file_path)
except Exception as e:
return False, f"could not read file: {e}"
dup_mask_first = df.duplicated(keep="first")
n_dups = int(dup_mask_first.sum())
if n_dups == 0:
return True, f"no fully duplicate rows (checked {len(df):,} rows)"
# Write a report of every row in any duplicate group (keep=False shows all copies)
report_msg = ""
if output_dir:
try:
os.makedirs(output_dir, exist_ok=True)
report_df = df[df.duplicated(keep=False)].copy()
report_df.insert(0, "FileLine", report_df.index + 2) # +2: 1-index + header
report_path = os.path.join(output_dir, "FullyDuplicate_rows_report.csv")
report_df.to_csv(report_path, index=False)
report_msg = f"; report saved to {report_path}"
except Exception as e:
report_msg = f"; failed to write report: {e}"
# 1-indexed line numbers including the header row (so row 0 in df is line 2)
dup_line_nums = [int(i) + 2 for i in dup_mask_first[dup_mask_first].index[:5]]
suffix = f" (and {n_dups - 5} more)" if n_dups > 5 else ""
return True, (
f"found {n_dups:,} fully duplicate row(s) "
f"(extras will be removed in Step 3). "
f"First duplicates at file line(s): {dup_line_nums}{suffix}{report_msg}"
), "WARN"
# ---------- Column content check helpers ----------
#
# All column-content checks operate on the DataFrame pre-loaded into the
# orchestrator context (`df`), which has been read with pandas.read_csv and
# fully-duplicate-row-deduped. Each helper handles the standard preconditions
# (df present, column present) so the per-column wrapper stays one line.
def _ensure_df_and_columns(df, *columns):
"""Returns (ok, fail_message). ok=True means all required columns exist."""
if df is None:
return False, "dataframe unavailable (file could not be read)"
missing = [c for c in columns if c not in df.columns]
if missing:
return False, f"required column(s) missing from file: {missing}"
return True, None
def _check_column_is_string(df, column):
"""Column dtype is object and every non-null value is a Python str."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
col = df[column]
non_null = col.dropna()
if non_null.empty:
return True, f"'{column}' is empty (no values to type-check)"
bad = sorted({type(v).__name__ for v in non_null if not isinstance(v, str)})
if bad:
# WARN, not FAIL — dtype mismatch is informational (pandas often auto-casts
# numeric-looking columns to int/float). Value-level checks still run.
return True, (
f"'{column}' has {len(bad)} non-string type(s): {bad} "
f"(pandas dtype: {col.dtype})"
), "WARN"
return True, f"all {len(non_null):,} non-null '{column}' values are strings"
def _check_column_no_whitespace(df, column):
"""No values in the column have leading or trailing whitespace."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
col = df[column].dropna().astype(str)
has_ws = col.str.startswith(" ") | col.str.endswith(" ") | col.str.startswith("\t") | col.str.endswith("\t")
n = int(has_ws.sum())
if n == 0:
return True, f"no leading/trailing whitespace in '{column}'"
return False, f"{n:,} '{column}' value(s) have leading/trailing whitespace"
def _check_column_is_int(df, column):
"""Column dtype is integer-like, or all non-null values are whole numbers."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
col = df[column]
non_null = col.dropna()
if non_null.empty:
return True, f"'{column}' is empty (no values to type-check)"
if pd.api.types.is_integer_dtype(col):
return True, f"all {len(non_null):,} '{column}' values are integers (dtype: {col.dtype})"
# Accept float dtype only if every value happens to be a whole number,
# which is what pandas gives you for an integer column that contains NaN.
if pd.api.types.is_float_dtype(col):
whole = non_null == non_null.astype("int64", errors="ignore")
try:
if whole.all():
return True, (
f"'{column}' is float dtype ({col.dtype}) but all "
f"{len(non_null):,} values are whole numbers"
)
except Exception:
pass
# WARN, not FAIL — dtype mismatch is informational.
return True, (
f"'{column}' has non-integer numeric values (dtype: {col.dtype})"
), "WARN"
return True, f"'{column}' is not integer (dtype: {col.dtype})", "WARN"
def _check_column_is_numeric(df, column):
"""Column dtype is numeric (int or float), or every value coerces cleanly to a number."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
col = df[column]
non_null = col.dropna()
if non_null.empty:
return True, f"'{column}' is empty (no values to type-check)"
if pd.api.types.is_numeric_dtype(col):
return True, (
f"all {len(non_null):,} '{column}' values are numeric (dtype: {col.dtype})"
)
coerced = pd.to_numeric(col, errors="coerce")
n_unparseable = int((coerced.isna() & col.notna()).sum())
if n_unparseable:
# WARN, not FAIL — dtype mismatch is informational; downstream numeric
# checks (range, positive, equals) catch the unparseable rows directly.
return True, (
f"'{column}' is not numeric (dtype: {col.dtype}); "
f"{n_unparseable:,} value(s) cannot be parsed as numbers"
), "WARN"
return True, f"'{column}' is parseable as numbers (stored as {col.dtype})"
def _check_column_positive(df, column):
"""All non-null values in `column` are strictly positive (> 0)."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
coerced = pd.to_numeric(df[column], errors="coerce")
non_positive = coerced[coerced <= 0]
n_unparseable = int(coerced.isna().sum() - df[column].isna().sum())
if non_positive.empty and n_unparseable == 0:
return True, f"all '{column}' values are positive (> 0)"
parts = []
if not non_positive.empty:
sample = non_positive.dropna().unique()[:5].tolist()
parts.append(f"{len(non_positive):,} value(s) <= 0 (e.g. {sample})")
if n_unparseable:
parts.append(f"{n_unparseable:,} value(s) could not be parsed as numbers")
return False, f"'{column}': " + "; ".join(parts)
def _check_column_is_bool(df, column):
"""Column dtype is boolean."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
col = df[column]
non_null = col.dropna()
if non_null.empty:
return True, f"'{column}' is empty (no values to type-check)"
if pd.api.types.is_bool_dtype(col):
return True, f"all {len(non_null):,} '{column}' values are booleans (dtype: {col.dtype})"
types = sorted({type(v).__name__ for v in non_null})
# WARN, not FAIL — dtype mismatch is informational; the set-membership
# check (only True/False) catches actual value problems.
return True, (
f"'{column}' is not boolean (dtype: {col.dtype}, value types: {types})"
), "WARN"
def _check_column_in_set(df, column, allowed):
"""All non-null values in `column` belong to `allowed` (a set or iterable)."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
col = df[column]
non_null = col.dropna()
if non_null.empty:
return True, f"'{column}' is empty"
allowed_set = set(allowed)
bad_mask = ~non_null.isin(allowed_set)
n_bad = int(bad_mask.sum())
if n_bad == 0:
return True, f"all '{column}' values are in {sorted(allowed_set, key=str)}"
sample = non_null[bad_mask].unique()[:5].tolist()
return False, (
f"{n_bad:,} '{column}' value(s) not in {sorted(allowed_set, key=str)} "
f"(e.g. {sample})"
)
def _check_column_at_least(df, column, threshold):
"""All non-null values in `column` are >= `threshold`."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
coerced = pd.to_numeric(df[column], errors="coerce")
below = coerced[coerced < threshold]
n_unparseable = int((coerced.isna() & df[column].notna()).sum())
if below.empty and n_unparseable == 0:
return True, f"all '{column}' values are >= {threshold:,}"
parts = []
if not below.empty:
sample = below.dropna().unique()[:5].tolist()
parts.append(f"{len(below):,} value(s) < {threshold:,} (e.g. {sample})")
if n_unparseable:
parts.append(f"{n_unparseable:,} value(s) could not be parsed as numbers")
return False, f"'{column}': " + "; ".join(parts)
def _check_column_equals(df, column, expected, atol=1e-9):
"""All non-null values in `column` equal `expected` (within float tolerance)."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
coerced = pd.to_numeric(df[column], errors="coerce")
n_unparseable = int((coerced.isna() & df[column].notna()).sum())
# Compare only the rows that successfully coerced
valid = coerced.dropna()
not_equal = valid[~(valid.sub(expected).abs() <= atol)]
if not_equal.empty and n_unparseable == 0:
return True, f"all '{column}' values equal {expected}"
parts = []
if not not_equal.empty:
sample = not_equal.unique()[:5].tolist()
parts.append(f"{len(not_equal):,} value(s) != {expected} (e.g. {sample})")
if n_unparseable:
parts.append(f"{n_unparseable:,} value(s) could not be parsed as numbers")
return False, f"'{column}': " + "; ".join(parts)
def _check_column_in_range(df, column, lo, hi, lo_inclusive=True, hi_inclusive=True):
"""All non-null values in `column` are within the range between `lo` and `hi`.
By default bounds are inclusive on both ends (`[lo, hi]`). Pass
`lo_inclusive=False` / `hi_inclusive=False` to use open bounds (e.g.
`(0, 6)` to require strictly `0 < value < 6`).
"""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
coerced = pd.to_numeric(df[column], errors="coerce")
n_unparseable = int((coerced.isna() & df[column].notna()).sum())
below = coerced < lo if lo_inclusive else coerced <= lo
above = coerced > hi if hi_inclusive else coerced >= hi
out_of_range = coerced[below | above]
n_out = int(out_of_range.shape[0])
range_str = (
f"{'[' if lo_inclusive else '('}{lo}, {hi}{']' if hi_inclusive else ')'}"
)
if n_out == 0 and n_unparseable == 0:
return True, f"all '{column}' values are in {range_str}"
parts = []
if n_out:
sample = out_of_range.dropna().unique()[:5].tolist()
parts.append(f"{n_out:,} value(s) outside {range_str} (e.g. {sample})")
if n_unparseable:
parts.append(f"{n_unparseable:,} value(s) could not be parsed as numbers")
return False, f"'{column}': " + "; ".join(parts)
def _check_column_min_length(df, column, min_length):
"""All non-null values in `column` have length strictly greater than `min_length`."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
col = df[column].dropna().astype(str)
too_short = col[col.str.len() <= min_length]
n = int(too_short.shape[0])
if n == 0:
return True, f"all '{column}' values have length > {min_length}"
sample = too_short.unique()[:5].tolist()
return False, (
f"{n:,} '{column}' value(s) have length <= {min_length} (e.g. {sample})"
)
def _check_column_no_nulls(df, column):
"""No null/NaN values in the column."""
ok, fail = _ensure_df_and_columns(df, column)
if not ok:
return False, fail
n = int(df[column].isna().sum())
if n == 0:
return True, f"no null values in '{column}'"
return False, f"{n:,} null value(s) in '{column}'"
def _check_no_duplicates_per_group(df, column, group_col, output_dir):
"""Within each `group_col` value, no two rows share the same `column` value.
Writes one CSV report per offending `group_col` value (sanitized filename),
listing every offending row. Emits WARN rather than FAIL.
"""
ok, fail = _ensure_df_and_columns(df, column, group_col)
if not ok:
return False, fail
dup_mask = df.duplicated(subset=[group_col, column], keep=False)
if not dup_mask.any():
return True, f"no duplicate '{column}' values within any '{group_col}' group"
n_dup_rows = int(dup_mask.sum())
dup_df = df[dup_mask].copy()
reports = []
if output_dir:
try:
os.makedirs(output_dir, exist_ok=True)
for group_val, group_rows in dup_df.groupby(group_col):
safe = re.sub(r"[^A-Za-z0-9_.\-]", "_", str(group_val))
report_path = os.path.join(
output_dir, f"duplicate_{column}_per_{group_col}_{safe}.csv"
)
group_rows.to_csv(report_path, index=False)
reports.append(os.path.basename(report_path))
except Exception as e:
return True, (
f"found {n_dup_rows:,} duplicate '{column}' rows across "
f"{dup_df[group_col].nunique()} '{group_col}' group(s); "
f"failed to write reports: {e}"
), "WARN"
n_groups = dup_df[group_col].nunique()
return True, (
f"found {n_dup_rows:,} duplicate '{column}' row(s) across "
f"{n_groups} '{group_col}' group(s); reports: {reports}"
), "WARN"
# ---------- COMPOUND_ID checks ----------
def check_compound_id_is_string(file_path, df=None, **_):
"""COMPOUND_ID dtype is string (VARCHAR-equivalent)."""
return _check_column_is_string(df, "COMPOUND_ID")
def check_compound_id_no_whitespace(file_path, df=None, **_):
"""COMPOUND_ID values have no leading/trailing whitespace."""
return _check_column_no_whitespace(df, "COMPOUND_ID")
def check_compound_id_no_nulls(file_path, df=None, **_):
"""COMPOUND_ID has no null values."""
return _check_column_no_nulls(df, "COMPOUND_ID")
def check_compound_id_unique_per_target(file_path, df=None, output_dir=None, **_):
"""Each COMPOUND_ID appears at most once per TARGET_ID."""
return _check_no_duplicates_per_group(df, "COMPOUND_ID", "TARGET_ID", output_dir)
# ---------- SMILES checks ----------
def check_smiles_is_string(file_path, df=None, **_):
"""SMILES dtype is string (VARCHAR-equivalent)."""
return _check_column_is_string(df, "SMILES")
def check_smiles_no_whitespace(file_path, df=None, **_):
"""SMILES values have no leading/trailing whitespace."""
return _check_column_no_whitespace(df, "SMILES")
def check_smiles_no_nulls(file_path, df=None, **_):
"""SMILES has no null values."""
return _check_column_no_nulls(df, "SMILES")
def check_smiles_unique_per_target(file_path, df=None, output_dir=None, **_):
"""Each SMILES appears at most once per TARGET_ID."""
return _check_no_duplicates_per_group(df, "SMILES", "TARGET_ID", output_dir)
def check_smiles_valid(file_path, df=None, output_dir=None, **_):
"""Every SMILES is non-empty and RDKit-parseable.
Isomer groups (parts separated by ';') are split and each component is
validated independently, so a pre-Step-4 row like `'CC;CCC'` is checked
as two valid molecules rather than failing as one malformed SMILES.
Writes `invalid_smiles_report.csv` listing every offending row and the
type of problem (`empty` or `malformed: '<part>'`).
"""
ok, fail = _ensure_df_and_columns(df, "SMILES")
if not ok:
return False, fail
smiles_col = df["SMILES"]
# Identify problems per row. Build maps so we only call MolFromSmiles
# once per unique part across the whole column.
row_parts = {}
unique_parts = set()
for idx, raw in smiles_col.items():
parts = _split_smiles(raw)
row_parts[idx] = parts
unique_parts.update(parts)
invalid_parts = {p for p in unique_parts if Chem.MolFromSmiles(p) is None}
empty_rows = []
malformed_rows = [] # list of (idx, first_invalid_part)
for idx, parts in row_parts.items():
if not parts:
empty_rows.append(idx)
else:
bad = next((p for p in parts if p in invalid_parts), None)
if bad is not None:
malformed_rows.append((idx, bad))
n_empty = len(empty_rows)
n_malformed = len(malformed_rows)
if n_empty == 0 and n_malformed == 0:
return True, f"all {len(df):,} SMILES rows are valid (non-empty, RDKit-parseable)"
report_msg = ""
if output_dir:
try:
os.makedirs(output_dir, exist_ok=True)
offending_indices = sorted(set(empty_rows) | {i for i, _ in malformed_rows})
bad_part_for = {i: p for i, p in malformed_rows}
report_df = df.loc[offending_indices].copy()
report_df.insert(
0, "Issue",
["empty" if i in set(empty_rows) else f"malformed: '{bad_part_for[i]}'"
for i in offending_indices],
)
report_path = os.path.join(output_dir, "invalid_smiles_report.csv")
report_df.to_csv(report_path, index=False)
report_msg = f"; report saved to {os.path.basename(report_path)}"
except Exception as e:
report_msg = f"; failed to write report: {e}"
return False, (
f"found {n_empty:,} empty and {n_malformed:,} malformed SMILES row(s){report_msg}"
)
def check_smiles_in_library(file_path, df=None, masterlist_dir=None, output_dir=None, **_):
"""Every SMILES in the input is present in the master list referenced by
`MasterList_Information.xlsx` for this raw CSV.
Isomer groups are split on ';' and each component is checked against the
library individually. WARN (not FAIL): a SMILES that's not in the
declared library is a data-integrity flag worth surfacing, but does not
block downstream processing.
"""
ok, fail = _ensure_df_and_columns(df, "SMILES")
if not ok:
return False, fail
if not masterlist_dir:
return False, "masterlist_dir not provided; cannot check library membership"
info_path = os.path.join(masterlist_dir, "MasterList_Information.xlsx")
library_smiles = _load_associated_library_smiles(file_path, masterlist_dir, info_path)
if library_smiles is None:
return False, (
"could not load associated library (check MasterList_Information.xlsx "
"FileName/MaterListName mapping and the referenced library file)"
)
smiles_col = df["SMILES"]
missing_rows = [] # list of (idx, first_missing_part)
for idx, raw in smiles_col.items():
for part in _split_smiles(raw):
if part not in library_smiles:
missing_rows.append((idx, part))
break
if not missing_rows:
return True, (
f"all SMILES are in the associated library "
f"({len(library_smiles):,} library SMILES)"
)
report_msg = ""
if output_dir:
try:
os.makedirs(output_dir, exist_ok=True)
offending_indices = sorted({i for i, _ in missing_rows})
missing_part_for = {i: p for i, p in missing_rows}
report_df = df.loc[offending_indices].copy()
report_df.insert(0, "MissingPart", [missing_part_for[i] for i in offending_indices])
report_path = os.path.join(output_dir, "smiles_not_in_library_report.csv")
report_df.to_csv(report_path, index=False)
report_msg = f"; report saved to {os.path.basename(report_path)}"
except Exception as e:
report_msg = f"; failed to write report: {e}"
return True, (
f"found {len(missing_rows):,} row(s) with SMILES not in the associated library{report_msg}"
), "WARN"
# ---------- ASMS_BATCH_NAME checks ----------
# Matches "<provider>_<batchnumber>", e.g. "sgcto_01". Provider is alphabetic,
# batch is digits. Case-insensitive; the actual provider is validated against
# Providers.csv at check time.
_BATCH_NAME_RE = re.compile(r"^([a-zA-Z]+)_(\d+)$")
def check_asms_batch_name_is_string(file_path, df=None, **_):
"""ASMS_BATCH_NAME dtype is string (VARCHAR-equivalent)."""
return _check_column_is_string(df, "ASMS_BATCH_NAME")
def check_asms_batch_name_no_whitespace(file_path, df=None, **_):
"""ASMS_BATCH_NAME values have no leading/trailing whitespace."""
return _check_column_no_whitespace(df, "ASMS_BATCH_NAME")
def check_asms_batch_name_no_nulls(file_path, df=None, **_):
"""ASMS_BATCH_NAME has no null values."""
return _check_column_no_nulls(df, "ASMS_BATCH_NAME")
def check_asms_batch_name_consistent(file_path, df=None, **_):
"""All rows share the same ASMS_BATCH_NAME value (one file = one batch)."""
ok, fail = _ensure_df_and_columns(df, "ASMS_BATCH_NAME")
if not ok:
return False, fail
non_null = df["ASMS_BATCH_NAME"].dropna()
if non_null.empty:
return False, "ASMS_BATCH_NAME column has no non-null values"
unique = non_null.unique()