-
Notifications
You must be signed in to change notification settings - Fork 3
/
PriusImageCache.py
112 lines (81 loc) · 2.64 KB
/
PriusImageCache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import io
import os
import re
import time
from abc import ABC, ABCMeta, abstractmethod
from datetime import datetime, timedelta
from time import mktime
from typing import Any, Union
import cv2
import dhash
from PIL import Image
class Deduplication(ABC):
__metaclass__ = ABCMeta
# Constructor
def __init__(self):
self.cache = dict()
# Abstract Methods
@abstractmethod
def is_image_duplicate(self, src, cam_id=''):
pass
@abstractmethod
def get_image_hash(self, src):
pass
# Concrete Methods
def get_hash_key(self, src):
return re.search(r'_(.*)\.jpg', src).group()
@abstractmethod
def put_hash(self, src, cam_id=''):
pass
def get_hash(self, src):
meta = self.cache[self.get_hash_key(src)]
return meta['image_hash']
def compare_images(self, image1, image2):
hash1 = self.get_image_hash(image1)
hash2 = self.get_image_hash(image2)
diff = dhash.get_num_bits_different(int(hash1, 16), int(hash2, 16))
if diff > 3:
return False
return True
class PathDeduplication(Deduplication):
def put_hash(self, src, cam_id=''):
image_hash = self.get_image_hash(src)
meta = dict(timestamp=time.localtime(), image_hash=image_hash)
self.cache[self.get_hash_key(src)] = meta
def is_image_duplicate(self, src, cam_id=''):
meta = self.cache[self.get_hash_key(src)]
if meta is None:
return False
new_hash = self.get_image_hash(src)
diff = dhash.get_num_bits_different(int(meta['image_hash'], 16), int(new_hash, 16))
if diff > 3:
return False
return True
def get_image_hash(self, src):
image = Image.open(src)
row, col = dhash.dhash_row_col(image)
return dhash.format_hex(row, col)
class ImageDeduplication(Deduplication):
def put_hash(self, src, cam_id=''):
imageStream = io.BytesIO(src)
imageFile = Image.open(imageStream)
row, col = dhash.dhash_row_col(imageFile)
image_hash = dhash.format_hex(row, col)
meta = dict(timestamp=time.localtime(), image_hash=image_hash)
print("Putting hash " + str(image_hash) + " for cam " + str(cam_id) + " in cache.")
self.cache[cam_id] = meta
def is_image_duplicate(self, src, cam_id=''):
print("Checking Duplicate: " + str(cam_id))
if cam_id not in self.cache:
return False
meta = self.cache[cam_id]
new_hash = self.get_image_hash(src)
diff = dhash.get_num_bits_different(int(meta['image_hash'], 16), int(new_hash, 16))
if diff > 3:
return False
return True
def get_image_hash(self, src):
imageStream = io.BytesIO(src)
imageFile = Image.open(imageStream)
row, col = dhash.dhash_row_col(imageFile)
return dhash.format_hex(row, col)