Skip to content

bolig_ping.cache

docs module bolig_ping.cache

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""Cache to store already sent homes."""

import json
from pathlib import Path

from .data_models import Home


def store_to_cache(
    homes: list[Home], emails: list[str], cache_path: Path = Path(".bolig_ping_cache")
) -> None:
    """Store the homes to the cache.

    Args:
        homes:
            The homes to store in the cache.
        emails:
            The receiver(s) of the homes.
        cache_path (optional):
            The path to the cache file. Defaults to ".bolig_ping_cache".
    """
    homes = remove_cached_homes(homes=homes, emails=emails, cache_path=cache_path)
    for email in emails:
        added_homes: set[tuple[str, str]] = set()
        with cache_path.open("a") as file:
            for home in homes:
                home_id = home.url.split("/")[-1]
                if (home_id, email) in added_homes:
                    continue
                home_json = json.dumps(dict(id=home_id, email=email))
                file.write(f"{home_json}\n")
                added_homes.add((home_id, email))


def remove_cached_homes(
    homes: list[Home], emails: list[str], cache_path: Path = Path(".bolig_ping_cache")
) -> list[Home]:
    """Remove the cached homes from the list of homes.

    Args:
        homes:
            The homes to remove the cached homes from.
        emails:
            The receiver(s) of the homes.
        cache_path (optional):
            The path to the cache file. Defaults to ".bolig_ping_cache".

    Returns:
        The homes without the cached homes.
    """
    cache_path.touch(exist_ok=True)
    with cache_path.open() as file:
        for line in file:
            json_data = json.loads(line)
            if any(json_data["email"] == email for email in emails):
                homes = [
                    home for home in homes if home.url.split("/")[-1] != json_data["id"]
                ]
    return homes