-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFaceEncoder.py
1167 lines (900 loc) · 48.8 KB
/
FaceEncoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
class GenerateFaces:
"""
A class to extract and encode faces of specified pictures
...
Description
----------
GenerateFaces is a class with multiple sub functions which are trying to detect faces in any given picture using the Multi-task
Cascaded Convolutional Networks (MTCNN). The process of MTCNN consists of three tasks (Face classification, Bounding box regression and
Facial Landmark localization) of convolutional networks that are able to recognize faces and landmark location such as eyes, nose,
and mouth. We then recenter the detected faces using the determined landmarks (eyes, node, mouth) because we need to make sure that
width and height of the cropped faces are of equal size as input to our FaceRecognition model (required input 160x160).
Next we crop the faces out of the pictures and use the last layer of our face recognition model (here FaceNet) to encode the cropped
faces and archive all the intermediate and final results in a .json database
...
Methods
-------
run()
run the pipline
get_database()
returns the full database
...
Helper functions
-------
crop_faces_centered_boxes_for_encoding()
iterate through each detected face, recenter, crop, encode and archive (if requested also plot) each face
import_FaceNet()
import FaceNet
img_to_encoding()
scales the image to expected input shape (160x160) and runs the forward propagation of the model on the specified image to get
the encodings of the faces
archive_database()
archive the generated database to specified output directory
"""
# The init method or constructor
def __init__(self, file_path , output="/mnt/golem/frodo/cropped_faces/", thr=0, plot=True, model_path='../dat/models/keras-facenet-h5/model.json', weight_path = '../dat/models/keras-facenet-h5/model.h5', database="/mnt/golem/frodo/Database/FaceDB.json"):
"""
Parameters
----------
file_path : str
path to the input pictures - pictures to extract faces. If the argument is specified with None. you could use the class to
import the alredy existing database e.g GenerateFaces(file_path=None, *args).get_database()
output : str
path to the output directory - where the cropped faces are stored
thr : float, optional
the confidence threshold - probability that the detected face is a face (default 0)
plot : bool
if we want to plot the cropped faces in the terminal - not recommended
model_path : str
path to the Face Recognition model configuration (FaceNet)
weigth_path : str
path to the pretrained weights of the Face Recognition model (FaceNet)
"""
import os
import json
# check if that specified database path already exist
if os.path.isfile(database):
# load the pre-existing database
self.DB = json.loads(open(database).read())
# determine which unique keys are alredy reserved
self.n = len(self.DB)
# if there is no pre-existing database
else:
# initiate a container for the datbase and the unique ID
self.DB = None
self.n = 0
### initiate all specified arguments such that they are reachable for sub functions
# path to the input pictures - pictures to extract faces
self.path = file_path
# path to the output directory - where the cropped faces are stored
self.output_dir = output
# the confidence threshold - probability that the detected face is a face
self.thr = thr
# if we want to plot the cropped faces in the terminal - not recommended
self.plot = plot
# self.container serves as our internal database
self.container = {}
# import our Face recognition model to encode the cropped faces
self.import_FaceNet(model_path=model_path, weight_path=weight_path)
# initiate the batch number - number of pictures which where alredy processed in a certain batch
self.batch = 0
# self.DB = self.get_existingDB(self, path=None)
self.database = database
def run(self, archive = True, batch_size = 100):
"""
starts the pipline.
...
Description
----------
.run() launch the pipline by looping through each specified image detect if and where there is a face (MTCNN), recenter
(using landmarks in the face), crop, encode (FaceNet) and archive the intermediate as well as final results
...
Parameters
----------
archive : bool, optional
determines if you want to archive the intermediate and final results into the specified database (default is True)
batch_size : int, optional
the number of processed pictures after which the database will be loged (default is 100)
...
Output
----------
1) archives the intermediate and final results in a json (our reference database used for clustering) file with the following
structure
uniqueID:
path_pic_orig : str - path of original image
path_croped_pic : str - path of cropped image/face
encodings: list - last layer of FaceNet (1x128)
recentered_rectangle: list - coordinates of cropped recentered face in original image in form of [x1, x2, y1, y2]
detector : dic - MTCNN output containing Face Classification, Bounding Box Regression and Facial Landmark Localization
box : list - coordinates of original non-recentered detected face in original image in form of [x1, y1, width, height]
confidence : float - probability that the given rectangle is a real face
keypoints : dic - landmark coordinates
left_eye : tuple of x,y ccoordinates marking the left eye
right_eye : tuple of x,y ccoordinates marking the right eye
nose : tuple of x,y ccoordinates marking the nose eye
mouth_left : tuple of x,y ccoordinates marking the left mouth
mouth_right : tuple of x,y ccoordinates marking the right mouth
2) each detected face will be cropped out of the original image and saved in the specified output directory in the following
structure
if the probability is higher than the specified threshold (default = 0)
uniqueID_cropped_face.png: file in specified ouput directory
otherwise
less_confident_faces: folder
{}(confidence)_cropped_face.png: file in sub of output directory
"""
from tqdm import tqdm
import matplotlib.pyplot as plt
from mtcnn.mtcnn import MTCNN
from PIL import Image
from pillow_heif import register_heif_opener
import numpy as np
# loop throug each image
for pics in tqdm(self.path):
if self.DB != None:
# check if the image was already processed and successfully archived in database
if pics in list(set([v["path_pic_orig"] for v in self.DB.values()])):
print("[INFO] this image is already in the Database {}".format(pics))
continue
# save the image path such that it can be reach in helper/sub functions
self.filename = pics
# load the image depending on image format/prefix
if ("HEIC" == self.filename.split(".")[-1]) | ("heic"== self.filename.split(".")[-1]): #could also use .lower() to have in one go
# HEIC is Apple’s proprietary version of the HEIF or High-Efficiency Image File format. This newer file format is
# intended to be a better way to save your pictures, making your images smaller in terms of data while retaining high
# quality.
register_heif_opener()
try:
image = Image.open(self.filename)
self.pixels = image.__array__()
except Exception as e:
print("[WARNING] there was an error in this image {} - maybe it is truncated?".format(self.filename))
print(e)
continue
elif ("PNG" == self.filename.split(".")[-1]) | ("png"== self.filename.split(".")[-1]):
# png have 4 channels R,G,B and alpha for transparency --> here we get rid of the aloha/transperency channel
try:
image = Image.open(self.filename).convert('RGBA')
background = Image.new('RGBA', image.size, (255,255,255))
alpha_composite = Image.alpha_composite(background, image)
alpha_composite_3 = alpha_composite.convert('RGB')
except Exception as e:
print("[WARNING] there was an error in this image {} - maybe it is truncated?".format(self.filename))
print(e)
continue
self.pixels = np.asarray(alpha_composite_3)
else:
try:
# here we try to import any other image format - designed for prefix jpg|JPG - but who knows maybe that is also valid for other formats - did not test yet
self.pixels = plt.imread(self.filename)
except Exception as e:
# if there was an error although its in jpg format - its probably truncated
if ("jpg" == self.filename.split(".")[-1].lower()):
print("[WARNING] there was an error in this image {} - maybe it is truncated?".format(self.filename))
else:
# otherwise its probably because we did not integrate the specified format
print("[WARNING] there was an error in this image {} - We do not support this image format yet {}".format(self.filename, self.filename.split(".")[-1]))
print(e)
continue
# try:
# self.pixels = self.super_resolution(self.pixels)
# except Exception as e:
# print("[WARNING] the picture was not upsampled using the super resolution method")
# print(e)
# create/initiate the detector, using default weights
detector = MTCNN()
# detect faces in the image
self.faces = detector.detect_faces(self.pixels)
# crop, recenter and encode the detected faces
self.crop_faces_centered_boxes_for_encoding()
# archive the temporaly saved processed faces when ever the batch size is full
if archive:
if self.batch > batch_size:
print("[INFO] archive batch")
self.archive_database()
self.batch = 0
else:
self.batch += 1
# end statement with some statistics when run completed
if archive:
print("[INFO] preparing final archive of FaceDB ... \nlooped through {} pictures, detected {} faces in {} pictures".format(len(self.path), len(self.container), len(np.unique([v["path_pic_orig"] for v in self.container.values()]))))
self.archive_database()
print("[INFO] archived 'FaceDB' sucessfully!!!")
print("[INFO] There are currently {} faces in FaceDB encoded".format(len(self.container)))
else:
print("[INFO] DB was not archived")
# draw each face separately
def crop_faces_centered_boxes_for_encoding(self):
import cv2
import matplotlib.pyplot as plt
# import uuid
import numpy as np
import os
# input dimension
dim = self.pixels.shape
# loop through each detected face in a picture - remember one image can have multiple faces - think about group photos and so on
for i in range(len(self.faces)):
# if confident in face detection
if self.faces[i]['confidence'] > self.thr:
# generate unique id for detected face
# ID = uuid.uuid4().int
self.n += 1
ID = self.n
# while True:
# if ID in self.container.keys():
# ID = uuid.uuid4().int
# else:
# break
# get coordinates of original detected face
x1, y1, width, height = self.faces[i]['box']
x2, y2 = x1 + width, y1 + height
# recenter the detected rectangle around the face
circles = self.faces[i]['keypoints'].values() # rember keypoints contain dic of landmarks in format of tuple(x,y)
center = (sum(k[0] for k in circles)/len(circles), sum(k[1] for k in circles)/len(circles))
### adjust the ratio of the rectangle towards the longer side of height and width
### remember input for FaceNet is 160x160 as we don t want to deform the face by resizing it
### we recenter the rectangle and keep same side size
# determine the longer side of width and height
MAX = int(np.max([width, height]))
x1, x2, y1, y2 = (int(center[0]-MAX/2), int(center[0]+MAX/2), int(center[1]-MAX/2), int(center[1]+MAX/2))
# check for the cases that we go out of the image with the new rectangle
# in case we go out of the left side of the image
if x1 < 0:
# add the part which was over the left side and add it to the right side of the rectangle
x2 += x1*-1
# set the left point to the border of the left side of the image
x1 = 0
# in case we go out of the right side of the image
if x2 > dim[1]:
# add the part which was over the right side and add it to the left side of the rectangle
x1 -= x2-dim[1]
# set the right point to the border of the right side of the image
x2 = dim[1]
# in case we go out of the top side of the image
if y1 < 0:
# add the part which was over the top part and add it to the bottom part of the rectangle
y2 += y1*-1
# set the top point to the boarder of the top part of the image
y1 = 0
# in case we go out of the bottom part of the image
if y2 > dim[0]:
# add the part which was over the bottom part and add it to the top part of the rectangle
y1 -= y2-dim[0]
# set the bottom point to the boarder of the bottom part of the image
y2 = dim[0]
# crop the face out of the image
crop = self.pixels[y1:y2, x1:x2]
# save the cropped face in in specified output directory for manual inspection
output_dir = self.output_dir + str(ID) + "_cropped_face.png"
cv2.imwrite(output_dir, cv2.cvtColor(crop, cv2.COLOR_RGB2BGR))
# resize and encode the faces
self.encodings = self.img_to_encoding(output_dir)
# check if there is alredy an existing database
if self.DB != None:
# check if we have alredy processed this image by checking the encodings
# note at the beginning of the code we alredy checked if that image was processed. However, this was just based on the
# filename - we could rename the image and have it as unique image - thats not what we want
# therefore i implemented this part to be sure that we have only unique faces with no replications
match = [k for k,v in self.DB.items() if v["encodings"] == self.encodings.tolist()]
if len(match) > 0:
print("[INFO] This face already existed in the database {}".format(match[0]))
# since we have alredy saved this cropped file before we need to remove it from the directory
os.remove(output_dir)
# if this face was not already stored in the database archive it temproaly until the batch size is full filled
else:
self.container[ID] = {"path_pic_orig": self.filename,
"path_croped_pic": output_dir,
"encodings": self.encodings.tolist(),
"recentered_rectangle": [x1, x2, y1, y2],
"detector": self.faces[i]}
# if there is no pre-existing database archive the intermediate and final results temproaly until the batch size is full filled
else:
self.container[ID] = {"path_pic_orig": self.filename,
"path_croped_pic": output_dir,
"encodings": self.encodings.tolist(),
"recentered_rectangle": [x1, x2, y1, y2],
"detector": self.faces[i]}
# plot the faces if wanted - not recommended
if self.plot:
# define subplot
plt.subplot(1, len(self.faces), i+1)
plt.axis('off')
# plot face
plt.imshow(crop)
# if confident in face detection is too low < thr
# skip the processing part and just save the original detected face by MCTNN and save the cropped face into a sub folder
# called less_confident_faces in output directory
else:
# get coordinates of original detected face of MCTNN
x1, y1, width, height = self.faces[i]['box']
x2, y2 = x1 + width, y1 + height
#check if the folder less_confident_faces is alredy available if not -> create the folder
if os.path.isdir(self.output_dir + "less_confident_faces") == False:
os.mkdir(self.output_dir + "less_confident_faces")
# save the cropped detected face
output_dir = self.output_dir + "less_confident_faces/" + str(self.faces[i]['confidence']) + "_cropped_face.png"
cv2.imwrite(output_dir, cv2.cvtColor(self.pixels[y1:y2, x1:x2], cv2.COLOR_RGB2BGR))
print("[INFO] This face did not had high confidence {}".format(self.faces[i]['confidence']))
def import_FaceNet(self, model_path, weight_path):
"""
import FaceNet
...
Description
----------
.importFaceNet() imports the FaceNet model. FaceNet is a face recognition system developed in 2015 by researchers at Google that
achieved then state-of-the-art results on a range of face recognition benchmark datasets.
...
Parameters
----------
model_path : str
path to the FaceNet config file path
weight_path : str
path to the FaceNet weight file path
"""
from tensorflow.keras.models import model_from_json
json_file = open(model_path, 'r')
loaded_model_json = json_file.read()
json_file.close()
self.FaceNet = model_from_json(loaded_model_json)
self.FaceNet.load_weights(weight_path)
#tf.keras.backend.set_image_data_format('channels_last')
def img_to_encoding(self, image_path):
"""
scales the image to expected input shape (160x160) and runs the forward propagation of the model on the specified image to get
the encodings of the faces
...
Description
----------
.img_to_encoding() scales the image to expected input shape (160x160) for the FaceNet model and runs the forward propagation of the
model on the specified image to get the encodings of the faces
...
Parameters
----------
image_path : str
path to the image file
Output
----------
encoding : np.array of shape (1,128)
last layer of FaceNet model
"""
import tensorflow as tf
import numpy as np
img = tf.keras.preprocessing.image.load_img(image_path, target_size=(160, 160))
img = np.around(np.array(img) / 255.0, decimals=12)
x_train = np.expand_dims(img, axis=0)
embedding = self.FaceNet.predict_on_batch(x_train)
return embedding / np.linalg.norm(embedding, ord=2)
def archive_database(self):
"""
archive the database
...
Description
----------
.archive_database() converts and archives the temporaly stored dictonary to a json file in the specified
directory. Containg all intermediate as well as final results of run() pipline in the format format.
uniqueID:
path_pic_orig : str - path of original image
path_croped_pic : str - path of cropped image/face
encodings: list - last layer of FaceNet (1x128)
recentered_rectangle: list - coordinates of cropped recentered face in original image in form of [x1, x2, y1, y2]
detector : dic - MTCNN output containing Face Classification, Bounding Box Regression and Facial Landmark Localization
box : list - coordinates of original non-recentered detected face in original image in form of [x1, y1, width, height]
confidence : float - probability that the given rectangle is a real face
keypoints : dic - landmark coordinates
left_eye : tuple of x,y ccoordinates marking the left eye
right_eye : tuple of x,y ccoordinates marking the right eye
nose : tuple of x,y ccoordinates marking the nose eye
mouth_left : tuple of x,y ccoordinates marking the left mouth
mouth_right : tuple of x,y ccoordinates marking the right mouth
...
Output
----------
database : file
json file containg all intermediate as well as final results of run() pipline
"""
import json
DB = self.get_database()
with open(self.database, 'w') as fp:
json.dump(DB, fp)
def get_database(self):
"""
get the database
...
Description
----------
.get_database() checks if there is already a pre-existing database an returns the full database (=pre-existing DB + generated
DB). Database has the following format.
uniqueID:
path_pic_orig : str - path of original image
path_croped_pic : str - path of cropped image/face
encodings: list - last layer of FaceNet (1x128)
recentered_rectangle: list - coordinates of cropped recentered face in original image in form of [x1, x2, y1, y2]
detector : dic - MTCNN output containing Face Classification, Bounding Box Regression and Facial Landmark Localization
box : list - coordinates of original non-recentered detected face in original image in form of [x1, y1, width, height]
confidence : float - probability that the given rectangle is a real face
keypoints : dic - landmark coordinates
left_eye : tuple of x,y ccoordinates marking the left eye
right_eye : tuple of x,y ccoordinates marking the right eye
nose : tuple of x,y ccoordinates marking the nose eye
mouth_left : tuple of x,y ccoordinates marking the left mouth
mouth_right : tuple of x,y ccoordinates marking the right mouth
...
Output
----------
database : file
json file containg all intermediate as well as final results of run() pipline
"""
if self.DB != None:
self.container.update(self.DB)
return self.container
def restructure_DB(self,
cluster_path="/mnt/golem/frodo/ClusteredFaces_clean/",
new_DB_path = "/mnt/golem/frodo/Database/New_FaceDB.json",
ignore=["Face_-1"],
ig_faces=True):
import os
import re
import json
from tqdm import tqdm
# get the raw database
FaceDB = self.DB
# get all the identities of user defined labels (=folder names)
ID = os.listdir(cluster_path)
ID = [id for id in ID if os.path.isdir(cluster_path + id)]
if ig_faces:
ignore += [id for id in ID if re.match("Face.*", id)]
# get all the unique keys of the corpped faces cluster together
r = re.compile(".*png")
# pictures = {id: [re.split("[_.]",v)[1] for v in list(filter(r.match, os.listdir(cluster_path + id)))]
# for id in ID
# if id not in ignore}
# uncomment when not using extraction pipline
pictures = {id: [re.split("[_]",v)[0] for v in list(filter(r.match, os.listdir(cluster_path + id)))]
for id in ID
if id not in ignore}
# restructure the DB to the form of {real person name: list of FaceDB dict of the identified faces}
p_o = []
p_c = []
e = []
r_r = []
de = []
New_DB = {}
for new_id, old_id in tqdm(pictures.items()):
for k in old_id:
for K,V in FaceDB[k].items():
if K == 'path_pic_orig':
p_o.append(V)
elif K == 'path_croped_pic':
p_c.append(V)
elif K == 'encodings':
e.append(V)
elif K == 'recentered_rectangle':
r_r.append(V)
else:
de.append(V)
New_DB.update({new_id: {'path_pic_orig': p_o,
'path_croped_pic': p_c,
'encodings': e,
'recentered_rectangle': r_r,
'detector': de}})
p_o = []
p_c = []
e = []
r_r = []
de = []
with open(new_DB_path, 'w') as fp:
json.dump(New_DB, fp)
return New_DB
def super_resolution(self, img, model="lapsrn", zoom=8):
import cv2
import os
sr = cv2.dnn_superres.DnnSuperResImpl_create()
path = "smart_surveillance/FaceRecognition/superresolution_models/"
# print(os.listdir(path))
path = [path + i for i in os.listdir(path) if (i.split("_")[0].lower() == model) & (int(i.split("_")[1][1]) == zoom)][0]
sr.readModel(path)
sr.setModel(model, zoom) # set the model by passing the value and the upsampling ratio
result = sr.upsample(img) # upscale the input image
return result
######################################################################################################
###
###
######################################################################################################
class FaceCluster:
"""
A class to cluster the extracted faces based on the encodings of the cropped faces.
...
Description
----------
FaceCluster is a class to cluster the extracted faces based on the encodings of the cropped faces. This class uses the improved DBSCAN
method called HDBSCAN for clustering
...
Methods
-------
Cluster()
run the pipline
...
Output
-------
cluster labels of HDBSCAN
"""
def __init__(self, file_path="Database/FaceDB.json", IDs = None):
"""
Parameters
----------
file_path : str
path to the database you have just created using GenerateFaces().run()
IDs : list
run the clustering pipline only with a selection of faces within a specified folder must be a list of uniqueID pointing to
specific faces
"""
import json
import os
import numpy as np
# check if there is a pre-existing database
if not (os.path.isfile(file_path) and os.access(file_path, os.R_OK)):
print('[ERROR] The input encoding file, ' + str(path) + ' does not exists or unreadable')
exit()
# load the database
self.DB = json.loads(open(file_path).read())
# if you specified specific images via the IDs argument consider only these images for further clustering analysis
if IDs != None:
self.DB = {k: v for k,v in self.DB.items() if k in IDs}
# generate a matrix of all encodings where each row is a individual face and columns are encodings with dimensions (n,128)
self.encodings = np.vstack([np.array(v["encodings"]) for k,v in self.DB.items()])
def Cluster(self):
"""
.Cluster() runs HDBSCAN on the face encodings of FaceNet last layer to identify and cluster similar/same faces.
...
Description
----------
.Cluster() runs HDBSCAN on the face encodings of FaceNet last layer.
HDBSCAN - Hierarchical Density-Based Spatial Clustering of Applications with Noise. Performs DBSCAN over varying epsilon values
and integrates the result to find a clustering that gives the best stability over epsilon. This allows HDBSCAN to find clusters of
varying densities (unlike DBSCAN), and be more robust to parameter selection.
In practice this means that HDBSCAN returns a good clustering straight away with little or no parameter tuning -- and the primary
parameter, minimum cluster size, is intuitive and easy to select.
HDBSCAN is ideal for exploratory data analysis; it's a fast and robust algorithm that you can trust to return meaningful clusters
(if there are any).
...
Output
----------
cluster labels of HDBSCAN
"""
# from sklearn.cluster import DBSCAN
import hdbscan
import numpy as np
NumberOfParallelJobs = -1
# cluster the embeddings
print("[INFO] Clustering")
clt = hdbscan.HDBSCAN(min_cluster_size=10)
# clt = DBSCAN(eps=0.75, min_samples=10,
# n_jobs = NumberOfParallelJobs)
# clt.fit(self.encodings)
(labelIDs, labelIDs_freq) = np.unique(clt.fit_predict(self.encodings), return_counts=True)
# determine the total number of
# unique faces found in the dataset
# (labelIDs, labelIDs_freq) = np.unique(clt.labels_, return_counts=True)
numUniqueFaces = len(np.where(labelIDs > -1)[0])
print("[INFO] # unique faces: {}\n".format(numUniqueFaces))
return clt.labels_
######################################################################################################
###
###
######################################################################################################
class FaceImageGenerator:
"""
A class that archives the clustered faces into same cluster folders to get a structured profile for each subject that was detected in
the provided images.
...
Description
----------
FaceImageGenerator is a class that archives the clustered faces into same cluster folders to get a structured profile for
each subject that was detected in the provided images. This facilitates the evaluation of correct and wrong clusters.
...
Methods
-------
GenerateImages()
archive the clustered faces in structured/clustered folders
"""
def GenerateImages(self, labels, cluster_dir="./", OutputFolderName = "ClusteredFaces",
MontageOutputFolder = "Montage",
file_path= "Database/FaceDB.json",
IDs = None
):
import shutil
import os
import time
import json
import numpy as np
import cv2
from imutils import build_montages
from tqdm import tqdm
# create the output folder for the Clustered faces
OutputFolder = cluster_dir + OutputFolderName
# if that folder is not existing already create the folder
if not os.path.exists(OutputFolder):
os.makedirs(OutputFolder)
else:
# If the folder exists, delete the folder and all files and subdirectories below it.
shutil.rmtree(OutputFolder)
# wait a bit before you recreate that folder
time.sleep(0.5)
os.makedirs(OutputFolder)
# create the Montage folder path
MontageFolderPath = os.path.join(OutputFolder, MontageOutputFolder)
os.makedirs(MontageFolderPath)
# check if there is a exisiting database - if not exit
if not (os.path.isfile(file_path) and os.access(file_path, os.R_OK)):
print('The input encoding file, ' + str(file_path) + ' does not exists or unreadable')
exit()
# otherwise load the database
self.DB = json.loads(open(file_path).read())
# select only the faces which you clustered in FaceCluster
if IDs != None:
self.DB = {k: v for k,v in self.DB.items() if k in IDs}
# pull all the encodings from the database
self.IDs = list(self.DB.keys())
self.encodings = np.vstack([np.array(v["encodings"]) for k,v in self.DB.items()])
# get all unique clusteres as well as frequency
(labelIDs, labelIDs_freq) = np.unique(labels, return_counts=True)
# loop over the unique face integers
for labelID, labelID_freq in tqdm(zip(labelIDs, labelIDs_freq)):
# from the set
print("[INFO] There are {} faces for FaceCluster ID: {}".format(labelID_freq, labelID))
# create a folder for each unique cluster
FaceFolder = os.path.join(OutputFolder, "Face_" + str(labelID))
os.makedirs(FaceFolder)
# find all indexes into the `data` array that belong to the current label ID
idxs = np.where(labels == labelID)[0]
ID = np.array(self.IDs)[idxs].tolist()
# initialize the list of faces to include in the montage
portraits = []
# loop over the sampled indexes
for i in ID:
# load the input image
image = cv2.imread(self.DB[i]["path_croped_pic"])
portrait = image
# append the montage portrait if the montage isnot full < 25
if len(portraits) < 25:
portraits.append(portrait)
# assign a face cluster name of the face
FaceFilename = "face_" + str(i) + ".png"
# archive the individual clustered face
FaceImagePath = os.path.join(FaceFolder, FaceFilename)
cv2.imwrite(FaceImagePath, portrait)
# build the montage with the appended images of each cluster with at most 25 faces
montage = build_montages(portraits, (96, 120), (5, 5))[0]
#archive the montagee/portait in specified directory
MontageFilenamePath = os.path.join(
MontageFolderPath, "FaceCluster_" + str(labelID) + ".jpg")
cv2.imwrite(MontageFilenamePath, montage)
################################################################################################################################
###
###
################################################################################################################################
# class FaceRecognition:
# def who_is_it(self, image_path, database, model_path='../dat/models/keras-facenet-h5/model.json', weight_path = '../dat/models/keras-facenet-h5/model.h5'):
# import numpy as np
# import tensorflow as tf
# """
# Implements face recognition for the office by finding who is the person on the image_path image.
# ...
# Parameter
# -------
# image_path : str
# path to an image
# database : dic
# database containing image encodings along with the name of the person on the image as key
# ...
# Output
# -------
# min_dist : float
# the minimum distance between image_path encoding and the encodings from the database
# identity : str
# the name prediction for the person on image_path
# model_path : str
# path to the FaceNet config file path
# weight_path : str
# path to the FaceNet weight file path
# """
# # import the FaceNet model
# self.import_FaceNet(model_path=model_path, weight_path=weight_path)
# # Compute the target "encoding" for the image
# self.encoding = self.img_to_encoding(image_path)
# # Find the closest encoding
# # Initialize "min_dist" to a large value, say 100
# min_dist = 100
# # Loop over the database dictionary's names and encodings.
# for (name, db_enc) in database.items():
# # Compute L2 distance between the target "encodings" and the current db_enc from the database.
# # note we compute multiple distances for a subject and pick the distance with the minimal distance
# # dist = (np.vstack(db_enc["encodings"]) - self.encoding)**2
# # dist = np.sum(dist, axis=1)
# # dist = np.min(np.sqrt(dist))
# dist = np.linalg.norm(tf.subtract(np.vstack(db_enc["encodings"]), self.encoding), axis=1, ord=2).min()
# # dist = np.array([np.mean(np.array(dist))])
# # If this distance is less than the min_dist, then set min_dist to dist, and identity to name
# if dist < min_dist:
# min_dist = dist
# identity = name
# # if any of the distances is higher than 0.75 we don t think the given face is represented in the database
# if min_dist > 0.75:
# print("Not in the database.")
# else:
# print ("it's " + str(identity) + ", the distance is " + str(min_dist))
# return min_dist, identity
# def import_FaceNet(self, model_path='../dat/models/keras-facenet-h5/model.json',
# weight_path = '../dat/models/keras-facenet-h5/model.h5'):
# from tensorflow.keras.models import model_from_json
# json_file = open(model_path, 'r')
# loaded_model_json = json_file.read()
# json_file.close()
# self.FaceNet = model_from_json(loaded_model_json)
# self.FaceNet.load_weights(weight_path)
# def img_to_encoding(self, image_path):
# import tensorflow as tf
# import numpy as np
# img = tf.keras.preprocessing.image.load_img(image_path, target_size=(160, 160))
# img = np.around(np.array(img) / 255.0, decimals=12)
# x_train = np.expand_dims(img, axis=0)
# embedding = self.FaceNet.predict_on_batch(x_train)
# return embedding / np.linalg.norm(embedding, ord=2)
##################################################################################################################################
class FaceRecognition:
def who_is_it(self, image_path, database,
model_path='../dat/models/keras-facenet-h5/model.json',
weight_path = '../dat/models/keras-facenet-h5/model.h5',
thr=0.75,
plot=False):
import numpy as np
import tensorflow as tf
"""
Implements face recognition for the office by finding who is the person on the image_path image.
...
Parameter
-------
image_path : str
path to an image
database : dic
database containing image encodings along with the name of the person on the image as key
...
Output
-------
min_dist : float
the minimum distance between image_path encoding and the encodings from the database
identity : str
the name prediction for the person on image_path
model_path : str
path to the FaceNet config file path
weight_path : str
path to the FaceNet weight file path
"""
import json
import matplotlib.patches as patches
from numpy.linalg import norm
# import the FaceNet model
self.import_FaceNet(model_path=model_path, weight_path=weight_path)
face = self.detect_face(image_path, plot=plot)
if face is None:
return None, "No face visible"
face = self.resize(face, 160,160)
# Compute the target "encoding" for the image