nlp_dedup.cli

Command line interface for deduplicating a text corpus.

View Source
"""Command line interface for deduplicating a text corpus."""

import json
import subprocess
from pathlib import Path
from typing import Any, Dict, Generator, Union

import click

from .deduper import Deduper


@click.command()
@click.argument("corpus", type=click.Path(exists=True))
@click.argument("output-dir", type=click.Path())
@click.option(
    "--split-method",
    type=click.Choice(["word_ngram", "paragraph", "none"]),
    default="word_ngram",
    show_default=True,
    help="The method to split the documents into shingles.",
)
@click.option(
    "--ngram-size",
    type=int,
    default=13,
    show_default=True,
    help="The size of the ngrams to use for the word_ngram split method.",
)
@click.option(
    "--ngram-stride",
    type=int,
    default=1,
    show_default=True,
    help="The stride of the ngrams to use for the word_ngram split method.",
)
@click.option(
    "--similarity-threshold",
    type=float,
    default=0.8,
    show_default=True,
    help="The similarity threshold to use for the deduplication.",
)
@click.option(
    "--num-minhashes",
    type=int,
    default=128,
    show_default=True,
    help="The number of minhashes to use for the deduplication.",
)
@click.option(
    "--batch-size",
    type=int,
    default=1_000_000,
    show_default=True,
    help="The number of documents to process at once.",
)
@click.option(
    "--n-jobs",
    type=int,
    default=-1,
    show_default=True,
    help="The number of jobs to use for the deduplication.",
)
@click.option(
    "--random-seed",
    type=int,
    default=4242,
    show_default=True,
    help="The random seed to use for the deduplication.",
)
@click.option(
    "--store-corpus-to-disk/--no-store-corpus-to-disk",
    type=bool,
    default=True,
    show_default=True,
    help="Whether to store the corpus to disk.",
)
@click.option(
    "--store-mask-to-disk/--no-store-mask-to-disk",
    type=bool,
    default=False,
    show_default=True,
    help="Whether to store the mask to disk.",
)
@click.option(
    "--store-lsh-cache-to-disk/--no-store-lsh-cache-to-disk",
    type=bool,
    default=False,
    show_default=True,
    help="Whether to store the LSH cache to disk.",
)
@click.option(
    "--store-config-to-disk/--no-store-config-to-disk",
    type=bool,
    default=True,
    show_default=True,
    help="Whether to store the config to disk.",
)
@click.option(
    "--verbose/--no-verbose",
    type=bool,
    default=True,
    show_default=True,
    help="Whether to print output.",
)
@click.option(
    "--text-column",
    type=str,
    default="text",
    show_default=True,
    help="""The name of the column containing the text, if the entries in the corpus
    are dictionaries.""",
)
@click.option(
    "--overwrite/--no-overwrite",
    type=bool,
    default=False,
    show_default=True,
    help="Whether to overwrite the output directory if it already exists.",
)
def main(
    corpus: str,
    split_method: str,
    ngram_size: int,
    ngram_stride: int,
    similarity_threshold: float,
    num_minhashes: int,
    batch_size: int,
    n_jobs: int,
    random_seed: int,
    store_corpus_to_disk: bool,
    store_mask_to_disk: bool,
    store_lsh_cache_to_disk: bool,
    store_config_to_disk: bool,
    verbose: bool,
    text_column: str,
    output_dir: str,
    overwrite: bool,
) -> None:
    """Deduplicate a text corpus.

    Args:
        corpus (str):
            The path to the file containing the text corpus to deduplicate.
        split_method (str):
            The method to split the documents into shingles.
        ngram_size (int):
            The size of the ngrams to use for the word_ngram split method.
        ngram_stride (int):
            The stride of the ngrams to use for the word_ngram split method.
        similarity_threshold (float):
            The similarity threshold to use for the deduplication.
        num_minhashes (int):
            The number of minhashes to use for the deduplication.
        batch_size (int):
            The number of documents to process at once.
        n_jobs (int):
            The number of jobs to use for the deduplication.
        random_seed (int):
            The random seed to use for the deduplication.
        store_corpus_to_disk (bool):
            Whether to store the corpus to disk.
        store_mask_to_disk (bool):
            Whether to store the mask to disk.
        store_lsh_cache_to_disk (bool):
            Whether to store the LSH cache to disk.
        store_config_to_disk (bool):
            Whether to store the config to disk.
        verbose (bool):
            Whether to print output.
        text_column (str):
            The name of the column containing the text, if the entries in the corpus
            are dictionaries.
        output_dir (str):
            The directory to store the deduplicated corpus in.
        overwrite (bool):
            Whether to overwrite the output directory if it already exists.
    """
    # Initialise the Deduper
    deduper = Deduper(
        split_method=split_method,
        ngram_size=ngram_size,
        ngram_stride=ngram_stride,
        similarity_threshold=similarity_threshold,
        num_minhashes=num_minhashes,
        batch_size=batch_size,
        n_jobs=n_jobs,
        random_seed=random_seed,
        store_corpus_to_disk=store_corpus_to_disk,
        store_mask_to_disk=store_mask_to_disk,
        store_lsh_cache_to_disk=store_lsh_cache_to_disk,
        store_config_to_disk=store_config_to_disk,
        verbose=verbose,
    )

    # Count the number of lines in the corpus
    proc = subprocess.Popen(
        ["wc", "-l", corpus], stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    num_docs = int(proc.communicate()[0].split()[0])

    # Create generator for the corpus
    def corpus_generator() -> Generator[Union[str, Dict[str, Any]], None, None]:
        with Path(corpus).open() as f:
            for line in f:
                try:
                    yield json.loads(line)
                except json.JSONDecodeError:
                    yield line.strip("\n")

    # Deduplicate the corpus
    deduper.deduplicate(
        corpus=corpus_generator(),
        text_column=text_column,
        output_dir=output_dir,
        overwrite=overwrite,
        num_docs=num_docs,
    )