Skip to content

Commit

Permalink
build frontend demo
Browse files Browse the repository at this point in the history
  • Loading branch information
AtticusZeller committed Mar 3, 2024
1 parent 47f8a5c commit b3debb5
Show file tree
Hide file tree
Showing 208 changed files with 1,096,930 additions and 1,287 deletions.
4 changes: 2 additions & 2 deletions .idea/BoostFace.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@
我们个项目的主要使用场景研究,是针对于多人脸的实时的人脸识别情况,这个多人的实时你人脸识别的情况,他的研究的主题是前端使用更优化的人脸检测算法,以及选择了一个比较合适的追踪方法,多目标追踪算法。他们分别在现有的数据集上面表现的非常的优秀,以及我人脸检测算法,他在现有的数据集上面表现仅仅只是针对于多人脸检测的效果非常好。然后是多目标追踪,我们选由于我们的上下文是人脸识别,因此不太可能出现人脸被遮挡的情况,因为人们会刻意的在识别的场景下露出人脸,即使是多人同时识别的环境下。因此我们在这种非遮蔽的情况下,选择了一个性能比较优异的,清亮的,以及能很好的和人脸追踪算法,和人脸检测算法结合在一起的多目标追踪算法。以及我们给他取了个新的名字,并且我们在人脸追踪数据集上面表现性,表现准确率,儿子追踪性能,追踪的性能优异,这是人脸追踪的部分。之所以使用这种方式是为了他可以大大的减少后端的负载,因为我们对人脸进行赋予语义进行追踪,判断前后针之间的人脸的关系,我们可以在此基础上加一些额外的可以选择的条件,比如说人脸满足什么大小,以及后端的准确率是多少。如果检测精度识别的精度较低,我们可以对对这个结果进行再次检测,错误的结果进行再次检测等,这种就是说我们可以加入额外的人为的选择性条件来时,检测识别更加的灵活,以及可以减少后端的负载。这是前端人脸追踪算法部分相关的研究

对于后端,我们主要采用异步网络框架发API,在异步网络框架中使用web sock的socket技术进行双向通讯,双向异步通讯来实时的与前端传输图像流。因此相比较传统的同步框架,我们做了对图像传输性能相关的测试。在高负荷的情况下,异步框架具有极高的性能,就是在后端网络框架上的研究。在选定了后端网络框架这个接口网关的接口之后,我们涉及到了人脸识别的问题。人脸识别我们采用最新的人脸特征提取器,我们从前端接触到从前端接收到人脸检测的图片,我们在后端进行拉伸,并且进行特征提取,这个特征提取由于它是非常计算密集型的,耗时较长,因此我们在后端可以采用cud深度学习加速推理。这是前后端分离项目的优势,以及提取完特征线上之后,我们需要拿到特征数据库中去比对。我们在此之前,我们也对人脸特征TSHE模型的精度与速度做了相关的测试。然后我们也对车上搜索特征向量数据库的搜索做了相关测试。线上数据库对大量线上数据中,进行线上搜索的性能以及准确度做了相关测试,并且尝试使用GPU进行加速搜索。以及涉及到后端服务器负载的问题,我们可以使用K8S进行自动扩容等相关的操作,从而使得后端的服务可以弄纳高并发的性能以及高负荷的情况。这是后端相关的研究


### TODO
- [] demo
1 change: 1 addition & 0 deletions src/Demo/frontend/app/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .signal_bus import signalBus
1 change: 1 addition & 0 deletions src/Demo/frontend/app/common/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .client import client
176 changes: 176 additions & 0 deletions src/Demo/frontend/app/common/client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import json
from base64 import urlsafe_b64decode
from pathlib import Path
from time import time

import requests
from cryptography.fernet import Fernet

from ...common.types import Face2SearchSchema
from ...utils.decorator import error_handler


class TokenEncryptor:
"""Encrypt and decrypt tokens"""

def __init__(self):
self.key = self.load_or_create_key()

def load_or_create_key(self):
"""Load or create encryption key"""
key_path = Path(__file__).parent / "secret.key"
if key_path.exists():
return key_path.read_bytes()
else:
key = Fernet.generate_key()
key_path.write_bytes(key)
return key

def encrypt_data(self, data):
"""Encrypt data"""
f = Fernet(self.key)
return f.encrypt(data.encode())

def decrypt_data(self, encrypted_data):
"""Decrypt data"""
f = Fernet(self.key)
return f.decrypt(encrypted_data).decode()


class TokenManager(TokenEncryptor):
def __init__(self):
super().__init__()
self.access_token = None
self.refresh_token = None
self._load_tokens()

@property
def is_token_expired(self) -> bool:
"""Check if token is expired"""
time_now = round(time())
expires_at = time_now
has_expired = True
if self.access_token and self.access_token.split(".")[1]:
payload = self._decode_jwt(self.access_token)
exp = payload.get("exp")
if exp:
expires_at = int(exp)
has_expired = expires_at <= time_now
return has_expired

@staticmethod
def _decode_jwt(token: str) -> dict:
"""Decode JWT"""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("JWT is not valid: not a JWT structure")
base64Url = parts[1]
# Adding padding otherwise the following error happens:
# binascii.Error: Incorrect padding
base64UrlWithPadding = base64Url + "=" * (-len(base64Url) % 4)
return json.loads(urlsafe_b64decode(base64UrlWithPadding).decode("utf-8"))

def save_tokens(self, access_token: str, refresh_token: str):
"""Save tokens to file"""
self.refresh_token = refresh_token
self.access_token = access_token
encrypted_access_token = self.encrypt_data(access_token)
encrypted_refresh_token = self.encrypt_data(refresh_token)
token_path = Path(__file__).parent / "tokens.enc"
token_path.write_bytes(encrypted_access_token + b"\n" + encrypted_refresh_token)

def _load_tokens(self):
"""Load tokens from file"""
token_path = Path(__file__).parent / "tokens.enc"
if token_path.exists():
encrypted_tokens = token_path.read_bytes().split(b"\n")
if len(encrypted_tokens) == 2:
self.access_token = self.decrypt_data(encrypted_tokens[0])
self.refresh_token = self.decrypt_data(encrypted_tokens[1])


class AuthClient:
"""Client for sending requests and handling tokens"""

def __init__(self, base_url):
self.base_url = base_url
self.base_ws_url = base_url.replace("http", "ws")
self.token_manager = TokenManager()
self.user: dict | None = None

def sign_up(self, face2register: Face2SearchSchema, id: str, name: str):
url = f"{self.base_url}/auth/face-register/{id}/{name}"
headers = {"Content-Type": "application/json"}
data = face2register.model_dump()
# qt_logger.debug(f"send data:{data}")
response = requests.post(url, json=data, headers=headers)

return response

@error_handler
def login(self, email: str, password: str) -> dict | None:
"""Login with email and password"""
url = f"{self.base_url}/auth/login"
response = requests.post(url, json={"email": email, "password": password})

if response.status_code == 200:
data = response.json()
self.user = data.get("user")
self.token_manager.save_tokens(
data.get("access_token"), data.get("refresh_token")
)
return data
else:
return None

@error_handler
def get_user_info(self):
url = f"{self.base_url}/user/info"
response = requests.get(url, headers=self._auth_header)
if response.status_code == 200:
return response.json()
else:
return None

def _refresh_session(self):
"""refresh token"""
url = f"{self.base_url}/auth/refresh-token"
params = {"refresh_token": self.token_manager.refresh_token}
refresh_response = requests.post(url, params=params)
if refresh_response.status_code == 200:
data = refresh_response.json()
self.token_manager.save_tokens(
data.get("access_token"), data.get("refresh_token")
)
else:
print("failed to refresh token")

def _auth_header(self) -> dict:
"""Authorization header with access and refresh tokens"""
if self.token_manager.access_token is None:
return {}
headers = {
"Authorization": f"Bearer {self.token_manager.access_token}",
"Refresh-Token": self.token_manager.refresh_token,
}
return headers


client = AuthClient("http://localhost:5000")
client.login(email="zhouge1831@gmail.com", password="Zz030327#")
if __name__ == "__main__":
# 使用示例

response = client.login("zhouge1831@gmail.com", "Zz030327#")
token_before_refresh = client.token_manager.access_token
client._refresh_session()
token_after_refresh = client.token_manager.access_token
if token_before_refresh != token_after_refresh:
print("refresh token success")
else:
print("refresh token failed")

if response["access_token"] is not None:
print("登录成功")
else:
print("登录失败")
Loading

0 comments on commit b3debb5

Please sign in to comment.