#! /usr/bin/env python3 from collections import namedtuple import ctypes from datetime import datetime from functools import partial, reduce from itertools import takewhile import os from pathlib import Path import shelve import click import exifread libphash = ctypes.CDLL(Path(os.getenv("GUIX_ENVIRONMENT")) / "lib" / "libpHash.so") Image = namedtuple("Image", "path datetime phash") def read_datetime(path): with path.open("rb") as file: tags = exifread.process_file(file, builtin_types=True, stop_tag="DateTimeOriginal") if "EXIF DateTimeOriginal" in tags: return datetime.strptime(tags["EXIF DateTimeOriginal"], "%Y-%m-%d %H:%M:%S") else: return None def phash(path): hash = ctypes.c_uint64() libphash.ph_dct_imagehash(str(path).encode(), ctypes.byref(hash)) return hash.value def read_image(path, cache): key = str(path.resolve()) if key not in cache: cache[key] = Image(path, read_datetime(path), phash(path)) return cache[key] def read_images(directory, cache_file): with shelve.open(cache_file) as cache: return [read_image(path, cache) for path in directory.rglob("*.jpg")] def hamming_distance(hash1, hash2): return libphash.ph_hamming_distance(hash1, hash2) def image_dissimilarity(image1, image2): # TODO: Maybe handle 90°, 180°, 270° rotations as well. return hamming_distance(image1.phash, image2.phash) def within_time_window(image1, image2, window_width): return abs((image1.datetime - image2.datetime).total_seconds()) < window_width def star_takewhile(pred, iterable): return takewhile(lambda x: pred(*x), iterable) def group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance): def comparison_pairs(): return ((image1, image2) for i, image1 in enumerate(images) for j, image2 in star_takewhile(lambda i, image: within_time_window(image1, image, time_window_width), enumerate(images[i+1:], i+1))) def reducer(similarity_sets, image_pair): image1, image2 = image_pair if minimum_hamming_distance <= image_dissimilarity(image1, image2) <= maximum_hamming_distance: # We assume similarity is transitive—that is, if image A # is similar to image B and image B is similar to image C, # we assume that image A is similar to image C. This need # not be true, but we assume that this is true in most # real-world images. set1 = next(set for set in similarity_sets if image1 in set) set2 = next(set for set in similarity_sets if image2 in set) # Union the two sets. If images are already in the same # set, don't do anything. return similarity_sets - {set1} - {set2} | {set1 | set2} else: return similarity_sets return reduce(reducer, comparison_pairs(), {frozenset([image]) for image in images}) @click.command() @click.argument("images-directory", type=click.Path(exists=True)) @click.option("--time-window-width", default=5*60, type=click.INT, help="Time window width (in seconds) within which to compare images (default: 300)") @click.option("--minimum-hamming-distance", default=0, type=click.INT, help="Hamming distance above which to group images (ranges from 0–64) (default: 0)") @click.option("--maximum-hamming-distance", default=10, type=click.INT, help="Hamming distance below which to group images (ranges from 0–64) (default: 10)") @click.option("--cache-file", default="find-duplicates-cache", type=click.Path(), help="Cache file path") def main(images_directory, time_window_width, minimum_hamming_distance, maximum_hamming_distance, cache_file): images = sorted([image for image in read_images(Path(images_directory), cache_file) if image.datetime], key=lambda image: image.datetime) for set in group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance): if len(set) > 1: print(*{image.path for image in set}) if __name__ == "__main__": main()