diff options
author | Arun Isaac | 2025-09-29 13:55:46 +0100 |
---|---|---|
committer | Arun Isaac | 2025-09-29 14:01:51 +0100 |
commit | 9766e70f7fea16642fbf53272d7775bedd846a48 (patch) | |
tree | 5cc66866c01ef94819ef7c3773a3d0ebecc660d2 /find-duplicates | |
download | photo-utils-9766e70f7fea16642fbf53272d7775bedd846a48.tar.gz photo-utils-9766e70f7fea16642fbf53272d7775bedd846a48.tar.lz photo-utils-9766e70f7fea16642fbf53272d7775bedd846a48.zip |
Diffstat (limited to 'find-duplicates')
-rwxr-xr-x | find-duplicates | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/find-duplicates b/find-duplicates new file mode 100755 index 0000000..ba62f2e --- /dev/null +++ b/find-duplicates @@ -0,0 +1,113 @@ +#! /usr/bin/env python3 + +from collections import namedtuple +import ctypes +from datetime import datetime +from functools import partial, reduce +from itertools import takewhile +import os +from pathlib import Path +import shelve + +import click +import exifread + +libphash = ctypes.CDLL(Path(os.getenv("GUIX_ENVIRONMENT")) / "lib" / "libpHash.so") + +Image = namedtuple("Image", "path datetime phash") + +def read_datetime(path): + with path.open("rb") as file: + tags = exifread.process_file(file, + builtin_types=True, + stop_tag="DateTimeOriginal") + if "EXIF DateTimeOriginal" in tags: + return datetime.strptime(tags["EXIF DateTimeOriginal"], + "%Y-%m-%d %H:%M:%S") + else: + return None + +def phash(path): + hash = ctypes.c_uint64() + libphash.ph_dct_imagehash(str(path).encode(), ctypes.byref(hash)) + return hash.value + +def read_image(path, cache): + key = str(path.resolve()) + if key not in cache: + cache[key] = Image(path, read_datetime(path), phash(path)) + return cache[key] + +def read_images(directory, cache_file): + with shelve.open(cache_file) as cache: + return [read_image(path, cache) + for path in directory.rglob("*.jpg")] + +def hamming_distance(hash1, hash2): + return libphash.ph_hamming_distance(hash1, hash2) + +def image_dissimilarity(image1, image2): + # TODO: Maybe handle 90°, 180°, 270° rotations as well. + return hamming_distance(image1.phash, image2.phash) + +def within_time_window(image1, image2, window_width): + return abs((image1.datetime - image2.datetime).total_seconds()) < window_width + +def star_takewhile(pred, iterable): + return takewhile(lambda x: pred(*x), iterable) + +def group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance): + def comparison_pairs(): + return ((image1, image2) + for i, image1 in enumerate(images) + for j, image2 in star_takewhile(lambda i, image: within_time_window(image1, image, time_window_width), + enumerate(images[i+1:], i+1))) + + def reducer(similarity_sets, image_pair): + image1, image2 = image_pair + if minimum_hamming_distance <= image_dissimilarity(image1, image2) <= maximum_hamming_distance: + # We assume similarity is transitive—that is, if image A + # is similar to image B and image B is similar to image C, + # we assume that image A is similar to image C. This need + # not be true, but we assume that this is true in most + # real-world images. + set1 = next(set for set in similarity_sets if image1 in set) + set2 = next(set for set in similarity_sets if image2 in set) + # Union the two sets. If images are already in the same + # set, don't do anything. + return similarity_sets - {set1} - {set2} | {set1 | set2} + else: + return similarity_sets + + return reduce(reducer, + comparison_pairs(), + {frozenset([image]) for image in images}) + +@click.command() +@click.argument("images-directory", type=click.Path(exists=True)) +@click.option("--time-window-width", + default=5*60, + type=click.INT, + help="Time window width (in seconds) within which to compare images (default: 300)") +@click.option("--minimum-hamming-distance", + default=0, + type=click.INT, + help="Hamming distance above which to group images (ranges from 0–64) (default: 0)") +@click.option("--maximum-hamming-distance", + default=10, + type=click.INT, + help="Hamming distance below which to group images (ranges from 0–64) (default: 10)") +@click.option("--cache-file", + default="find-duplicates-cache", + type=click.Path(), + help="Cache file path") +def main(images_directory, time_window_width, minimum_hamming_distance, maximum_hamming_distance, cache_file): + images = sorted([image for image in read_images(Path(images_directory), cache_file) + if image.datetime], + key=lambda image: image.datetime) + for set in group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance): + if len(set) > 1: + print(*{image.path for image in set}) + +if __name__ == "__main__": + main() |