1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
#! /usr/bin/env python3
from collections import namedtuple
import ctypes
from datetime import datetime
from functools import partial, reduce
from itertools import takewhile
import os
from pathlib import Path
import shelve
import click
import exifread
libphash = ctypes.CDLL(Path(os.getenv("GUIX_ENVIRONMENT")) / "lib" / "libpHash.so")
Image = namedtuple("Image", "path datetime phash")
def read_datetime(path):
with path.open("rb") as file:
tags = exifread.process_file(file,
builtin_types=True,
stop_tag="DateTimeOriginal")
if "EXIF DateTimeOriginal" in tags:
return datetime.strptime(tags["EXIF DateTimeOriginal"],
"%Y-%m-%d %H:%M:%S")
else:
return None
def phash(path):
hash = ctypes.c_uint64()
libphash.ph_dct_imagehash(str(path).encode(), ctypes.byref(hash))
return hash.value
def read_image(path, cache):
key = str(path.resolve())
if key not in cache:
cache[key] = Image(path, read_datetime(path), phash(path))
return cache[key]
def read_images(directory, cache_file):
with shelve.open(cache_file) as cache:
return [read_image(path, cache)
for path in directory.rglob("*.jpg")]
def hamming_distance(hash1, hash2):
return libphash.ph_hamming_distance(hash1, hash2)
def image_dissimilarity(image1, image2):
# TODO: Maybe handle 90°, 180°, 270° rotations as well.
return hamming_distance(image1.phash, image2.phash)
def within_time_window(image1, image2, window_width):
return abs((image1.datetime - image2.datetime).total_seconds()) < window_width
def star_takewhile(pred, iterable):
return takewhile(lambda x: pred(*x), iterable)
def group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance):
def comparison_pairs():
return ((image1, image2)
for i, image1 in enumerate(images)
for j, image2 in star_takewhile(lambda i, image: within_time_window(image1, image, time_window_width),
enumerate(images[i+1:], i+1)))
def reducer(similarity_sets, image_pair):
image1, image2 = image_pair
if minimum_hamming_distance <= image_dissimilarity(image1, image2) <= maximum_hamming_distance:
# We assume similarity is transitive—that is, if image A
# is similar to image B and image B is similar to image C,
# we assume that image A is similar to image C. This need
# not be true, but we assume that this is true in most
# real-world images.
set1 = next(set for set in similarity_sets if image1 in set)
set2 = next(set for set in similarity_sets if image2 in set)
# Union the two sets. If images are already in the same
# set, don't do anything.
return similarity_sets - {set1} - {set2} | {set1 | set2}
else:
return similarity_sets
return reduce(reducer,
comparison_pairs(),
{frozenset([image]) for image in images})
@click.command()
@click.argument("images-directory", type=click.Path(exists=True))
@click.option("--time-window-width",
default=5*60,
type=click.INT,
help="Time window width (in seconds) within which to compare images (default: 300)")
@click.option("--minimum-hamming-distance",
default=0,
type=click.INT,
help="Hamming distance above which to group images (ranges from 0–64) (default: 0)")
@click.option("--maximum-hamming-distance",
default=10,
type=click.INT,
help="Hamming distance below which to group images (ranges from 0–64) (default: 10)")
@click.option("--cache-file",
default="find-duplicates-cache",
type=click.Path(),
help="Cache file path")
def main(images_directory, time_window_width, minimum_hamming_distance, maximum_hamming_distance, cache_file):
images = sorted([image for image in read_images(Path(images_directory), cache_file)
if image.datetime],
key=lambda image: image.datetime)
for set in group_images(images, time_window_width, minimum_hamming_distance, maximum_hamming_distance):
if len(set) > 1:
print(*{image.path for image in set})
if __name__ == "__main__":
main()
|