-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_data.py
107 lines (89 loc) · 3.51 KB
/
generate_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
import sys
import click
import cv2 as cv
import pathlib
from threading import Lock
from multiprocessing.pool import ThreadPool
from sys import platform
from collections import deque
try:
# Import Openpose (Windows/Ubuntu/OSX)
dir_path = os.path.dirname(os.path.realpath(__file__))
try:
# Windows Import
if platform == "win32":
# Change these variables to point to the correct folder (Release/x64 etc.)
sys.path.append(dir_path + '/../../python/openpose/Release');
os.environ['PATH'] = os.environ['PATH'] + ';' + dir_path + '/../../x64/Release;' + dir_path + '/../../bin;'
import pyopenpose as op
else:
# Change these variables to point to the correct folder (Release/x64 etc.)
sys.path.append('openpose/build/python/');
# If you run `make install` (default path is `/usr/local/python` for Ubuntu), you can also access the OpenPose/python module from there. This will install OpenPose and the python library at your desired installation path. Ensure that this is in your python path in order to use it.
# sys.path.append('/usr/local/python')
from openpose import pyopenpose as op
except ImportError as e:
print('Error: OpenPose library could not be found. Did you enable `BUILD_PYTHON` in CMake and have this Python script in the right folder?')
raise e
except Exception as e:
print(e)
sys.exit(-1)
def set_params():
params = dict()
params["model_folder"] = "openpose/models/"
params["face"] = True
params["hand"] = True
params["disable_blending"] = False
opWrapper = op.WrapperPython()
opWrapper.configure(params)
opWrapper.start()
return opWrapper
wrapper = set_params()
@click.command()
@click.option("--file", type=click.Path(exists=True), required=True)
@click.option("--directory", type=str, default="data")
def cli(file, directory):
generate_data(file, directory)
def process_frame(frame):
datum = op.Datum()
datum.cvInputData = frame
wrapper.emplaceAndPop([datum])
gray = cv.cvtColor(datum.cvOutputData-frame, cv.COLOR_RGB2GRAY)
ret, binary = cv.threshold(gray, 0, 255, cv.THRESH_BINARY)
return frame, binary
def generate_data(file, directory):
if not os.path.exists(directory):
os.makedirs(directory)
cap = cv.VideoCapture(file)
if not cap.isOpened():
print("Cannot open camera")
exit()
thread_num = cv.getNumberOfCPUs()
pool = ThreadPool(processes=thread_num)
pending = deque()
frame_count = 0
lock = Lock()
print("Entering main Loop.")
while cap.isOpened():
while len(pending) > 0 and pending[0].ready():
orig, new = pending.popleft().get()
with lock:
cv.imwrite(f'data/source_{frame_count}.png', orig)
cv.imwrite(f'data/target_{frame_count}.png', new)
if frame_count+1 % 100 == 0:
print(f"Processed {frame_count} frames")
frame_count+=1
if len(pending) < thread_num:
_ret, frame = cap.read()
if not _ret:
continue
task = pool.apply_async(process_frame, (frame.copy(),))
pending.append(task)
if cv.waitKey(1) == ord('q'):
break
# When everything done, release the capture
cap.release()
cv.destroyAllWindows()
if __name__ == '__main__':
cli()