about summary refs log tree commit diff
path: root/find-duplicates
diff options
context:
space:
mode:
Diffstat (limited to 'find-duplicates')
-rwxr-xr-xfind-duplicates113
1 files changed, 113 insertions, 0 deletions
diff --git a/find-duplicates b/find-duplicates
new file mode 100755
index 0000000..ba62f2e
--- /dev/null
+++ b/find-duplicates
@@ -0,0 +1,113 @@
+#! /usr/bin/env python3
+
+from collections import namedtuple
+import ctypes
+from datetime import datetime
+from functools import partial, reduce
+from itertools import takewhile
+import os
+from pathlib import Path
+import shelve
+
+import click
+import exifread
+
+libphash = ctypes.CDLL(Path(os.getenv("GUIX_ENVIRONMENT")) / "lib" / "libpHash.so")
+
+Image = namedtuple("Image", "path datetime phash")
+
+def read_datetime(path):
+    with path.open("rb") as file:
+        tags = exifread.process_file(file,
+                                     builtin_types=True,
+                                     stop_tag="DateTimeOriginal")
+        if "EXIF DateTimeOriginal" in tags:
+            return datetime.strptime(tags["EXIF DateTimeOriginal"],
+                                     "%Y-%m-%d %H:%M:%S")
+        else:
+            return None
+
+def phash(path):
+    hash = ctypes.c_uint64()
+    libphash.ph_dct_imagehash(str(path).encode(), ctypes.byref(hash))
+    return hash.value
+
+def read_image(path, cache):
+    key = str(path.resolve())
+    if key not in cache:
+        cache[key] = Image(path, read_datetime(path), phash(path))
+    return cache[key]
+
+def read_images(directory, cache_file):
+    with shelve.open(cache_file) as cache:
+        return [read_image(path, cache)
+                for path in directory.rglob("*.jpg")]
+
+def hamming_distance(hash1, hash2):
+    return libphash.ph_hamming_distance(hash1, hash2)
+
+def image_dissimilarity(image1, image2):
+    # TODO: Maybe handle 90°, 180°, 270° rotations as well.
+    return hamming_distance(image1.phash, image2.phash)
+
+def within_time_window(image1, image2, window_width):
+    return abs((image1.datetime - image2.datetime).total_seconds()) < window_width
+
+def star_takewhile(pred, iterable):
+    return takewhile(lambda x: pred(*x), iterable)
+
+def group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance):
+    def comparison_pairs():
+        return ((image1, image2)
+                for i, image1 in enumerate(images)
+                for j, image2 in star_takewhile(lambda i, image: within_time_window(image1, image, time_window_width),
+                                                enumerate(images[i+1:], i+1)))
+
+    def reducer(similarity_sets, image_pair):
+        image1, image2 = image_pair
+        if minimum_hamming_distance <= image_dissimilarity(image1, image2) <= maximum_hamming_distance:
+            # We assume similarity is transitive—that is, if image A
+            # is similar to image B and image B is similar to image C,
+            # we assume that image A is similar to image C. This need
+            # not be true, but we assume that this is true in most
+            # real-world images.
+            set1 = next(set for set in similarity_sets if image1 in set)
+            set2 = next(set for set in similarity_sets if image2 in set)
+            # Union the two sets. If images are already in the same
+            # set, don't do anything.
+            return similarity_sets - {set1} - {set2} | {set1 | set2}
+        else:
+            return similarity_sets
+
+    return reduce(reducer,
+                  comparison_pairs(),
+                  {frozenset([image]) for image in images})
+
+@click.command()
+@click.argument("images-directory", type=click.Path(exists=True))
+@click.option("--time-window-width",
+              default=5*60,
+              type=click.INT,
+              help="Time window width (in seconds) within which to compare images (default: 300)")
+@click.option("--minimum-hamming-distance",
+              default=0,
+              type=click.INT,
+              help="Hamming distance above which to group images (ranges from 0–64) (default: 0)")
+@click.option("--maximum-hamming-distance",
+              default=10,
+              type=click.INT,
+              help="Hamming distance below which to group images (ranges from 0–64) (default: 10)")
+@click.option("--cache-file",
+              default="find-duplicates-cache",
+              type=click.Path(),
+              help="Cache file path")
+def main(images_directory, time_window_width, minimum_hamming_distance, maximum_hamming_distance, cache_file):
+    images = sorted([image for image in read_images(Path(images_directory), cache_file)
+                     if image.datetime],
+                    key=lambda image: image.datetime)
+    for set in group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance):
+        if len(set) > 1:
+            print(*{image.path for image in set})
+
+if __name__ == "__main__":
+    main()