-
Notifications
You must be signed in to change notification settings - Fork 3
Support deduplication of various formats #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c604a95
3415756
de320ea
6dbcfbc
131f83f
0f263c1
bfa2fb6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,116 @@ | ||
| from tqdm.autonotebook import tqdm | ||
| from multiprocessing import Pool | ||
| from datasketch import MinHash | ||
| from typing import Optional | ||
| from typing import Optional, cast | ||
| from glob import glob | ||
| import logging | ||
|
|
||
| import pickle | ||
| import json | ||
| from functools import partial | ||
| import os | ||
| import collections.abc | ||
| from pathlib import Path | ||
| import zstandard | ||
| import gzip | ||
| import io | ||
| import pyarrow.parquet | ||
| import itertools | ||
|
|
||
| # TODO check if minhashes already exist, recompute only if forced | ||
|
|
||
| def compute_minhash_text(t: tuple[int, str], fname: str, num_perm: int) -> tuple[str, MinHash] | None: | ||
| lineNo, line = t | ||
| s = set(line.split()) | ||
| if not s: | ||
| return None | ||
| m = MinHash(num_perm=num_perm) | ||
| for d in s: | ||
| m.update(d) | ||
| # generate a unique key for this document | ||
| key = f"{fname}-{lineNo}" | ||
| return (key, m) | ||
|
|
||
| SUPPORTED_FILETYPES = set([ | ||
| *[ | ||
| ((format, encoding) if encoding != "" else (format,)) | ||
| for format, encoding | ||
| in itertools.product( | ||
| [".jsonl", ".json"], | ||
| [".gz", ".zstd", ".zst", ""] | ||
| )], | ||
| (".parquet",) | ||
| ]) | ||
|
|
||
| def is_supported_dataset(p :Path) -> bool: | ||
| return tuple(p.suffixes[-2:]) in SUPPORTED_FILETYPES | ||
|
|
||
|
|
||
| def open_anydataset(p: Path) -> collections.abc.Generator[tuple[int, str]]: | ||
| match p.suffixes[-2:]: | ||
| case [(".json" |".jsonl")]: | ||
| with open(p, "r") as f: | ||
| for lineNo, line in enumerate(f): | ||
| try: | ||
| yield lineNo, cast(str, json.loads(line)["text"]) | ||
| except GeneratorExit as e: | ||
| raise e | ||
| except: | ||
| logging.exception("failed to parse lineNo %s of \"%s\"", lineNo, p) | ||
| case [(".jsonl" | ".json"), (".zst"| ".zstd")]: | ||
| with open(p, "rb") as f: | ||
| dctx = zstandard.ZstdDecompressor() | ||
| stream_reader = dctx.stream_reader(f) | ||
| text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8') | ||
| for lineNo, line in enumerate(text_stream): | ||
| try: | ||
| yield lineNo, cast(str, json.loads(line)["text"]) | ||
| except GeneratorExit as e: | ||
| raise e | ||
| except: | ||
| logging.exception("failed to parse lineNo %s of \"%s\"", lineNo, p) | ||
| case [(".jsonl" | ".json"), ".gz"]: | ||
| with gzip.open(p, "r") as f: | ||
| for lineNo, line in enumerate(f): | ||
| try: | ||
| yield lineNo, cast(str, json.loads(line)["text"]) | ||
| except GeneratorExit as e: | ||
| raise e | ||
| except: | ||
| logging.exception("failed to parse lineNo %s of \"%s\"", lineNo, p) | ||
| case ['.parquet']: | ||
| pq = pyarrow.parquet.ParquetFile(p) | ||
| idx = 0 | ||
| for row_group in range(pq.num_row_groups): | ||
| table = pq.read_row_group(row_group, columns=["text"]) | ||
| for v in table["text"]: | ||
| try: | ||
| yield idx, v.as_py() | ||
| idx += 1 | ||
| except GeneratorExit as e: | ||
| raise e | ||
| except: | ||
| logging.exception("failed to parse lineNo %s of \"%s\"", idx, p) | ||
|
|
||
| case _: | ||
| raise NotImplementedError(f"{p.suffixes[-2:]} is not a supported filetype") | ||
|
|
||
| def compute_minhash_for_anyfile(infile: str, output_dir: str, num_perm: int): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this covers all filetypes, perhaps we should delete
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't sure if this was used else where or upstream or if this was something you wrote. If you wrote this
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think we can refactor this here. The only other thing to note is that the MinHasher class has another |
||
| n = 50000 | ||
| path = Path(infile) | ||
| fin = open_anydataset(path) | ||
| with Pool(32) as p, tqdm(total=n, desc=path.stem) as pbar: | ||
| minhash_list = [] | ||
| partial_compute_minhash = partial(compute_minhash_text, fname=path.stem, num_perm=num_perm) | ||
| for result in p.imap_unordered(partial_compute_minhash, fin): | ||
| if result: | ||
| minhash_list.append(result) | ||
| pbar.update() | ||
| with open(f"{output_dir}/{path.stem[:-6]}.pkl", "wb") as fp: | ||
| pickle.dump(minhash_list, fp) | ||
| print(f"Generated MinHash for {len(minhash_list):,} documents in {path.stem}") | ||
|
|
||
|
|
||
| def compute_minhash_jsonl(t, fname, num_perm): | ||
| lineNo, line = t | ||
| lineNo += 1 | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.