added ansible run method

This commit is contained in:
IcyAltair 2025-10-09 18:42:11 +03:00
parent 243033584e
commit caa465f955
8 changed files with 76 additions and 1065 deletions

6
.gitignore vendored
View file

@ -1,2 +1,8 @@
simar-key
config.ini
.venv
*.log
__pycache__/
*.pyc
*.egg-info/
.pytest_cache/

View file

@ -1,8 +1,6 @@
import os
import logging
import configparser
import datetime
import json
from flask import (
Flask,
@ -21,45 +19,39 @@ from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
UPLOAD_FOLDER = os.path.join(ABS_PATH, "tmp")
ALLOWED_EXTENSIONS = {"csv", "mp4"}
MAX_CONTENT_PATH = 1000000000
CONFIG = os.path.join(ABS_PATH, "config.ini")
SCHEDULER_API_ENABLED = True
SHOW_LOG = True
from logger import LoggerFactory
# Настроим фабрику логгов
LoggerFactory.setting(
log_level=os.getenv("LOG_LEVEL", "INFO"),
log_format='[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s',
log_format="[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s",
show=True,
)
from cluster import Cluster
from manager_utils import batch_identification, subscribe_to_detections
from cluster_state import AutoState
app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates"))
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH
app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED
app.config["SESSION_TYPE"] = "filesystem"
auth = HTTPBasicAuth()
scheduler = APScheduler()
app_logger = LoggerFactory.get_logger('APP')
app_logger = LoggerFactory.get_logger("APP")
AUTO = AutoState(debug=False)
@scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900)
def cluster_state():
AUTO.check_cluster_state()
# app_logger.debug("Finished with auto cluster state")
# @scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900)
# def cluster_state():
# AUTO.check_cluster_state()
# # app_logger.debug("Finished with auto cluster state")
def get_config() -> dict:
@ -83,71 +75,19 @@ def verify_password(username, password):
return username
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route("/")
# curl -u <user>:<pass> -d "playbook=ping_workers&args=hosts=workers" -v http://localhost:5010/api/v1.0/run_ansible
@app.route("/api/v1.0/run_ansible", methods=["POST"])
@auth.login_required
def home():
return render_template("index.html")
@app.route("/faces")
def faces():
return render_template("faces.html")
@app.route("/cluster", methods=["GET", "POST"])
def cluster_activity():
if request.method == "POST":
if request.form.get("get_state") == "Get state":
def run_ansible():
try:
cluster = Cluster(CONFIG, ABS_PATH)
command = (
"ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml"
)
lines = cluster.run_ansible_command(command, use_configs=True)
state = "".join(lines).replace("\\n", "\n")
flash(state, category="success")
except Exception as e:
flash(f"Get state failed with: {e}", category="error")
app_logger.error(f"Get state failed with: {e}")
return redirect(request.url)
if request.method == "GET":
return render_template("cluster.html")
@app.route("/faces/batch", methods=["GET", "POST"])
def batch():
if request.method == "POST":
lookupIds = request.form.get("lookupIds")
faceVectorArchive = request.form.get("faceVectorArchive")
recognitionThreshold = request.form.get("recognitionThreshold")
try:
matches = batch_identification(
lookupIds, faceVectorArchive, recognitionThreshold
)
flash(
f"Batch identification match {matches}",
category="success",
)
except Exception as e:
flash(f"Batch identification failed with error: {e}", category="error")
app_logger.error(f"Batch identification failed with error: {e}")
return redirect(request.url)
if request.method == "GET":
return render_template("batch.html")
# curl -v -u <user>:<pass> http://localhost:5010/faces/api/v1.0/get_cluster_state
@app.route("/api/faces/v1.0/get-cluster-state", methods=["GET"])
@auth.login_required
def get_cluster_activity():
try:
cluster = Cluster(CONFIG, ABS_PATH)
command = "ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml"
cluster = Cluster(CONFIG, ABS_PATH, use_key=False)
playbook = request.form.get("playbook")
playbook_args = request.form.get("args")
if playbook and playbook_args:
command = f"""ansible-playbook --extra-vars "{playbook_args}" playbooks/{playbook}.yml"""
state = cluster.run_ansible_command(command, use_configs=True)
else:
state = "No playbook and playbook args given"
time = datetime.datetime.now()
data = {
"message": state,
@ -156,47 +96,11 @@ def get_cluster_activity():
}
return make_response(jsonify(data), 200)
except Exception as e:
data = {"message": f"Get state failed with error: {e}", "code": "FAILED"}
data = {"message": f"Run ansible failed with error: {e}", "code": "FAILED"}
app_logger.error(data)
return make_response(jsonify(data), 400)
# curl -v -u <user>:<pass> -X POST -H "Content-type: application/json" -d @test_input.json http://localhost:5010/faces/api/v1.0/batch
@app.route("/api/faces/v1.0/batch", methods=["POST"])
@auth.login_required
def batch_id():
content_type = request.headers.get("Content-Type")
if content_type == "application/json":
if request.is_json:
data = request.json
lookupIds = data.get("lookupIds")
faceVectorArchive = data.get("faceVectorArchive")
recognitionThreshold = data.get("recognitionThreshold")
try:
matches = batch_identification(
lookupIds, faceVectorArchive, recognitionThreshold
)
time = datetime.datetime.now()
data = {
"message": matches,
"code": "SUCCESS",
"insertTimestamp": time,
}
return make_response(jsonify(data), 200)
except Exception as e:
data = {
"message": f"Batch identification failed with error: {e}",
"code": "FAILED",
}
app_logger.error(data)
return make_response(jsonify(data), 400)
else:
data = {"message": f"Invalid json: {request}", "code": "FAILED"}
return make_response(jsonify(data), 400)
else:
data = {"message": f"Wrong content type: {content_type}", "code": "FAILED"}
return make_response(jsonify(data), 400)
def run_app():
port = int(os.environ.get("PORT", 5010))
app_logger.info(ABS_PATH)

View file

@ -7,28 +7,33 @@ from tabulate import tabulate
class Cluster:
def __init__(self, config_path: str, abs_path: str) -> None:
def __init__(self, config_path: str, abs_path: str, use_key=False) -> None:
self.abs_path = abs_path
self.config = configparser.ConfigParser()
self.config.read(config_path)
self.auth_data = dict(self.config.items("CLUSTER"))
self.ssh = paramiko.SSHClient()
self.use_key = use_key
def connect(self) -> None:
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.use_key:
k = paramiko.RSAKey.from_private_key_file(
os.path.join(self.abs_path, self.auth_data["key"])
)
host = self.auth_data["host"]
port = self.auth_data["port"]
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# self.ssh.load_system_host_keys()
# self.ssh.get_host_keys().add(host, "ssh-rsa", k)
self.ssh.connect(
hostname=self.auth_data["host"],
username=self.auth_data["user"],
pkey=k,
port=self.auth_data["port"],
)
else:
self.ssh.connect(
hostname=self.auth_data["host"],
username=self.auth_data["user"],
password=self.auth_data["password"],
port=self.auth_data["port"],
)
def get_state(self) -> list:
self.connect()
@ -47,7 +52,8 @@ class Cluster:
def run_ansible_command(self, command: str, use_configs=False) -> list:
self.connect()
if use_configs:
command = "cd sinapse-configs/nodes/ansible && " + command
command = "cd devops-configs/ansible && " + command
print(command)
try:
ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(command)
ssh_stdout.channel.set_combine_stderr(True)

View file

@ -5,7 +5,26 @@ import string
import time
from cluster import Cluster
from manager_agent import CLUSTER_STATE, NODES
HARD_STATE = {
"SYSTEM_MEMORY": 0,
"DEVICE_MEMORY": 0,
"CONTAINER_CAPACITY": 0,
"CPU_USAGE": None,
"DEVICE_USAGE": None,
}
CLUSTER_STATE = {
"1-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 1024,
"max_port": 1224,
},
}
NODES = 1
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
CONFIG = os.path.join(ABS_PATH, "config.ini")

View file

@ -0,0 +1,8 @@
#!/bin/bash
mkdir -p /logs
touch /logs/error.log
touch /logs/access.log
cd src
# exec gunicorn --bind 0.0.0.0:5010 -c docker-gunicorn.conf.py app:app
ls -lash
exec gunicorn --bind 0.0.0.0:5010 app:app

View file

@ -1,547 +0,0 @@
import argparse
import errno
import os
import select
import socket
import sys
"""
Есть горячие и холодные ноды.
При старте работы на 6 горячих нод уже запущены 6 контейнеров по 1 на каждой. На холодных установлен только образ, запущенных нет.
Далее берём максимум в 12 контейнеров на ноде.
При превышении запросами текущего количества работающих контейнеров стартуют контейнеры на горячих нодах до 6 контейнеров на каждой ноде (36).
Далее при увеличении нагрузки включаются холодные ноды, и также до 6 контейнеров на ноде (30).
Далее по аналогии увеличение контейнеров на горячих до 12 на ноде (72), и для холодных (60).
При постоянном потоке запросов превышающим кол-во работающих контейнеров (132) отбрасываем сообщения, что кластер забит.
Если нагрузка спадает, то на горячих нодах контейнеры работают ещё в течение 10 минут, далее включаются до 1 на ноде. На холодных - сразу всё отключается до 0.
Реализовано это всё через функции постоянного опроса стейта кластера, сохранения стейта и счётчиков работающих контейнеров с меткой модуля
"""
"""
СТАТИЧЕСКАЯ БАЛАНСИРОВКА
11 нод в кластере: 6 горячих, 5 холодных.
Оценить влияние кол-ва лиц на предельно допустимые значения нагрузки.
МАКС КОНТЕЙНЕРОВ = 176 TODO - const value - find max values - ?
МАКС ПО ТИПАМ (1-16, 2-14, 3-12, 4-10) TODO - const value - find max values - ?
Добавить черный список контейнеров - ?
VS
Фильтрация параметров - ?
0. 6 контейнеров на горячих нодах 1:1, 0 контейнеров на холодных нодах
1. Приемка запроса на подписку от грады
2. Определение типа детектора (4 типа: 1-2-3-4)
3. Определение текущей загрузки горячих нод (прямое исполнение команды на мастере)
4. Если не достигли половины предела (6 контейнеров * 6 нод = 36), то используем текущие,
запускаем новые с другим типом детектора (аргумент через ENV)
5. По достижению пол-предела в 36 контейнеров начинается загрузка холодных нод по аналогии с п. 4 (5 * 6 = 30)
6. По достижению пол-предела в 36 + 30 = 66 контейнеров - старт загрузки горячих нод по пункту 4
7. По аналогии с холодными нодами до 132 (при 1 типе), убывание максимума обратно пропорционально
кол-во запущенных модулей с мощным детектором
"""
class LoadBalancer:
sendbuffer = 4096
maxbuffersize = 2**16
maxservers = 132
def __init__(
self,
host="0.0.0.0",
port=80,
targets=[],
fallback=None,
check_time=None,
debug=False,
family=socket.AF_INET,
typ=socket.SOCK_STREAM,
):
assert isinstance(host, str), ValueError("Expected string as host-argument")
assert isinstance(port, int), ValueError("Expected int as port-argument")
assert isinstance(targets, list), ValueError(
"Expected list as targets-arguments"
)
assert isinstance(fallback, tuple) or (fallback is None), ValueError(
"Expected tuple or None as fallback-argument"
)
assert (
isinstance(check_time, int)
or isinstance(check_time, float)
or (check_time is None)
), ValueError("Expected int,long or float as check_time-argument")
assert check_time >= 0 or (check_time is None), ValueError(
"Expected positive integer as check_time-argument"
)
assert isinstance(debug, bool), ValueError("Expected bool as debug-argument")
for addr in targets:
assert len(addr) == 2, ValueError(
"Each address in targets needs to be a tuple of length 2"
)
assert isinstance(addr[0], str), ValueError(
"Each address in targets needs to have a str at index 0"
)
assert isinstance(addr[1], int), ValueError(
"Each address in targets needs to have a int at index 1"
)
assert addr[1] > 0, ValueError(
"Each address in targets needs to have a integer larger 0 as port"
)
self.host = host
self.port = port
self.targets = targets
self.fallback = fallback
self.check_time = check_time
self.debug = debug
self.family = family
self.type = typ
self.o2i = {}
self.i2o = {}
self.s2b = {}
self.o2t = {}
self.t2n = {}
self.running = False
self.bind_and_listen()
def bind_and_listen(self):
if self.debug:
print("binding...")
self.listen_s = socket.socket(self.family, self.type, 0)
self.listen_s.bind((self.host, self.port))
if self.debug:
print("bound to: {host}:{port}".format(host=self.host, port=self.port))
try:
self.listen_s.listen(3)
except:
self.listen_s.listen(1)
def mainloop(self):
if self.debug:
print("entering mainloop.")
self.running = True
try:
while self.running:
checkread = checkex = (
[self.listen_s] + self.i2o.keys() + self.o2i.keys()
)
checkwrite = filter(
None, [s for s in self.s2b.keys() if len(self.s2b[s]) > 0]
)
toread, towrite, exceptional = select.select(
checkread, checkwrite, checkex, self.check_time
)
if self.debug:
print("data avaible on: ", toread)
print("buffer free on: ", towrite)
print("errors on: ", exceptional)
if len(toread) == 0 and len(towrite) == 0 and len(exceptional) == 0:
continue
for s in exceptional:
if s is self.listen_s:
if self.debug:
print("error in listening socket!")
self.running = False
raise RuntimeError("Error in listening socket!")
elif s in self.o2i:
if self.debug:
print("error in client connection: closing socket.")
self.close_s(s)
else:
s2c = s
c2s = self.i2o[s2c]
if c2s in self.o2t:
old_peer = self.o2t[c2s]
if old_peer == self.fallback or (self.fallback is None):
old_peer = None
else:
old_peer = None
if old_peer is None:
if self.debug:
print("fallback not possible, closing s")
self.close_s(c2s)
else:
if self.debug:
print(
"error in connection to normal server, instead connecting to fallback-server."
""
)
if self.debug:
print("unregistering old peer")
try:
s2c.shutdown(socket.SHUT_RDWR)
except:
pass
try:
s2c.close()
except:
pass
if s2c in self.s2b:
old_buff = self.s2b[s2c]
if self.debug:
print(
"redirecting {n} bytes to fallback server".format(
n=len(old_buff)
)
)
del self.s2b[s2c]
else:
old_buff = ""
if s2c in self.i2o:
del self.i2o[s2c]
if c2s in self.o2t:
t = self.o2t[c2s]
if t in self.t2n:
self.t2n[t] = max(0, self.t2n[t] - 1)
self.o2t[c2s] = self.fallback
peer = socket.socket(self.family, self.type, 0)
peer.setblocking(0)
self.s2b[peer] = old_buff
self.o2i[c2s] = peer
self.i2o[peer] = c2s
self.o2t[c2s] = self.fallback
try:
err = peer.connect_ex(self.fallback)
except Exception as e:
try:
err = e.args[0]
except:
err = None
if (
err == errno.EWOULDBLOCK
or err == errno.WSAEWOULDBLOCK
):
pass
else:
if self.debug:
print("error during connect to fallback:", e)
self.close_s(c2s)
continue
if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK:
pass
else:
if self.debug:
print(
"error during connet to fallback:",
errno.errorcode[err],
)
self.close_s(c2s)
for s in towrite:
if s not in self.s2b:
continue
if len(self.s2b[s]) < self.sendbuffer:
tosend = self.s2b[s]
else:
tosend = self.s2b[s][: self.sendbuffer]
if self.debug:
print(
"sending {n} bytes (left: {t} bytes)".format(
n=len(tosend), t=len(self.s2b[s]) - len(tosend)
)
)
try:
sent = s.send(tosend)
except socket.error as e:
if self.debug:
print("error writing buffer:", e)
self.close_s(s)
continue
if self.debug:
print("sent {n} bytes.".format(n=sent))
if sent >= len(self.s2b[s]):
self.s2b[s] = ""
self.s2b[s] = self.s2b[s][sent:]
for s in toread:
if s is self.listen_s:
if self.debug:
print("got request")
if len(self.targets) == 0:
if self.debug:
print("using fallback server")
target = self.fallback
else:
c = 9999999
target = None
for ta in targets:
if ta not in self.t2n:
self.t2n[ta] = 0
n = self.t2n[ta]
if n < c:
c = n
target = ta
if target is None:
if self.debug:
print("cannot find a target!")
continue
if target in self.t2n:
self.t2n[target] += 1
else:
self.t2n[target] = 1
if self.debug:
print("target is:", target)
new_s, addr = self.listen_s.accept()
new_s.setblocking(0)
peer = socket.socket(self.family, self.type, 0)
peer.setblocking(0)
self.s2b[new_s] = ""
self.s2b[peer] = ""
self.o2i[new_s] = peer
self.i2o[peer] = new_s
self.o2t[new_s] = target
try:
err = peer.connect_ex(target)
except Exception as e:
try:
err = e.args[0]
except:
err = None
if err == errno.EWOULDBLOCK or err == errno.WSAEWOULDBLOCK:
pass
else:
if self.debug:
print("error during connect to target:", e)
self.close_s(new_s)
continue
if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK:
pass
else:
if self.debug:
print(
"error during connet to target:",
errno.errorcode[err],
)
self.close_s(new_s)
else:
if s in self.i2o:
p = self.i2o[s]
elif s in self.o2i:
p = self.o2i[s]
else:
if self.debug:
print("a socket has no peer registered, closing it.")
self.close_s(s)
continue
try:
peerbufferlength = len(self.s2b[p])
except KeyError:
peerbufferlength = 0
self.s2b[p] = ""
if peerbufferlength < self.maxbuffersize:
maxread = self.maxbuffersize - peerbufferlength
try:
data = s.recv(maxread)
except socket.error as e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
continue
else:
if self.debug:
print("error while receiving:", e)
self.close_s(s)
continue
if self.debug:
print("reveived {n} bytes.".format(n=len(data)))
if len(data) == 0:
if self.debug:
print("connection closed.")
self.close_s(s)
else:
self.s2b[p] += data
else:
continue
finally:
if self.debug:
print("closing all connections...")
try:
self.listen_s.close()
except:
pass
for s in self.o2i.keys() + self.i2o.keys():
self.close_s(s)
def close_s(self, s):
try:
s.shutdown(socket.SHUT_RDWR)
except:
pass
try:
s.close()
except:
pass
if s in self.s2b:
del self.s2b[s]
if s in self.o2t:
t = self.o2t[s]
del self.o2t[s]
if t in self.t2n:
self.t2n[t] = max(self.t2n[t] - 1, 0)
if s in self.o2i:
p = self.o2i[s]
del self.o2i[s]
self.close_s(p)
if s in self.i2o:
p = self.i2o[s]
del self.i2o[s]
self.close_s(p)
def add_target(self, addr):
assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument")
assert len(addr) == 2, ValueError(
"Expected a tuple of length 2 as addr-argument"
)
assert isinstance(addr[0], str), ValueError(
"Expected addr-argument to have a string at index 0"
)
assert isinstance(addr[1], str), ValueError(
"Expected addr-argument to have a integer at index 1"
)
if addr in self.targets:
raise ValueError("Target already registered!")
else:
self.targets.append(addr)
def remove_target(self, addr):
assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument")
assert len(addr) == 2, ValueError(
"Expected a tuple of length 2 as addr-argument"
)
assert isinstance(addr[0], str), ValueError(
"Expected addr-argument to have a string at index 0"
)
assert isinstance(addr[1], str), ValueError(
"Expected addr-argument to have a integer at index 1"
)
if addr in self.targets:
self.targets.remove(addr)
else:
raise ValueError("Target not found!")
def stop(self):
if not self.running:
raise RuntimeError("Server not running!")
self.running = False
self.check_time = 0
def load_from_file(path, ignore_errors=False):
targets = []
with open(path, "rU") as f:
for line in f:
try:
if line.startswith("#"):
continue
if line.count(":") != 1:
raise SyntaxError(
"Error in line '{l}': expected exactly one ':'!".format(l=line)
)
host, port = line.split(":")
try:
port = int(port)
except ValueError:
raise SyntaxError(
"Error in line '{l}': cannot convert port to int!".format(
l=line
)
)
if len(host) == 0:
raise SyntaxError(
"Error in line '{l}': invalid host format!".format(l=line)
)
targets.append((host, port))
except SyntaxError:
if not ignore_errors:
raise
return targets
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A Load-Balancer")
parser.add_argument("host", help="host/ip to bind to")
parser.add_argument("port", type=int, help="port to bind to")
parser.add_argument(
"-d", action="store_true", dest="debug", help="enable debug-mode"
)
parser.add_argument(
"-p",
action="store",
dest="path",
help="load target list from file",
required=False,
default=None,
)
parser.add_argument(
"-f",
action="store",
dest="fallback",
help="target to relay connections to if no server are aviable",
required=False,
default=None,
)
parser.add_argument(
"-t",
action="store",
dest="targets",
help="targets to spread connections to",
required=False,
default=None,
nargs="+",
)
ns = parser.parse_args()
if ns.targets is None:
targets = []
else:
targets = []
for ta in ns.targets:
if ta.count(":") != 1:
print(
"SyntaxError in command-line target-list: expected exactly one ':'!"
)
sys.exit(1)
host, port = ta.split(":")
if len(host) == 0:
print("SyntaxError in command-line target-list: invalid host!")
sys.exit(1)
try:
port = int(port)
if port <= 0:
raise ValueError
except ValueError:
print("SyntaxError in command-line target-list: invalid port!")
sys.exit(1)
targets.append((host, port))
if ns.fallback is not None:
if ns.fallback.count(":") != 1:
print("SyntaxError in fallback-argument: expected exactly one ':'!")
sys.exit(1)
host, port = ns.fallback.split(":")
if len(host) == 0:
print("SyntaxError in fallback-argument: invalid host!")
sys.exit(1)
try:
port = int(port)
if port <= 0:
raise ValueError
except ValueError:
print("SyntaxError in fallback-argument: invalid port!")
sys.exit(1)
fallback = (host, port)
else:
fallback = None
if ns.path is None:
pass
elif not os.path.isfile(ns.path):
print("Error: File not found!")
sys.exit(1)
else:
targets += load_from_file(ns.path, False)
if len(targets) == 0:
print("Error: no targets found!")
sys.exit(1)
lb = LoadBalancer(ns.host, ns.port, targets, fallback, None, ns.debug)
try:
lb.mainloop()
except (KeyboardInterrupt, SystemExit) as e:
pass
finally:
try:
lb.stop()
except RuntimeError as e:
pass

View file

@ -1,228 +0,0 @@
import json
import os
import pprint
import random
import string
from typing import Union, Tuple, Any
from module_size_calculator import get_face_recognition_module_resource_usage
from cluster import Cluster
from utils.patterns import Singleton
MAX_SYSTEM_MEMORY = 3000 # Mb
MAX_DEVICE_MEMORY = 7500 # Mb
MAX_SCORE = 110 # %
# MIN_PORT = 1000
# MAX_PORT = 2000
USED_PORTS = set()
NODES = 11
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
CONFIG = os.path.join(ABS_PATH, "config.ini")
USAGE_CONFIG = os.path.join(ABS_PATH, "docker_module_resource_usage.yaml")
RUNNING_STATE = "cluster_state_running_only.json"
FULL_STATE = "cluster_state_full.json"
SOFT_STATE = {"image": "", "detector_size": "", "max_faces": "", "min_fps": ""}
HARD_STATE = {
"SYSTEM_MEMORY": 0,
"DEVICE_MEMORY": 0,
"CONTAINER_CAPACITY": 0,
"CPU_USAGE": None,
"DEVICE_USAGE": None,
}
CLUSTER_STATE = {
# "1-node": {
# "soft_state": [],
# "hard_state": HARD_STATE,
# "running": 0,
# "min_port": 1024,
# "max_port": 1224,
# },
"2-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 2024,
"max_port": 2224,
},
"3-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 3024,
"max_port": 3224,
},
"4-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 4024,
"max_port": 4224,
},
"5-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 5024,
"max_port": 5224,
},
"6-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 6024,
"max_port": 6224,
},
"7-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 7024,
"max_port": 7224,
},
"8-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 8024,
"max_port": 8224,
},
"9-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 9024,
"max_port": 9224,
},
"10-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 10024,
"max_port": 10224,
},
"11-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 11024,
"max_port": 11224,
},
"12-node": {
"soft_state": [],
"hard_state": HARD_STATE,
"running": 0,
"min_port": 12024,
"max_port": 12224,
},
}
class Agent(Singleton):
def __init__(self, debug=False) -> None:
self.cluster = Cluster(CONFIG, ABS_PATH)
self.debug = debug
def read_cluster_state(self) -> None:
try:
with open(os.path.join(ABS_PATH, RUNNING_STATE), "r") as f:
running_state = json.load(f)
except FileNotFoundError as e:
raise FileNotFoundError(f"Cluster state is not available by {RUNNING_STATE}")
for node in running_state:
if node not in CLUSTER_STATE:
continue
CLUSTER_STATE[node]["running"] = running_state[node]["running"]
def sort_state(self) -> list:
sorted_state_list = sorted(CLUSTER_STATE.items(), key=lambda x: x[1]["running"])
if self.debug:
print("Sorted cluster state")
pprint.pprint(sorted_state_list)
return sorted_state_list
def get_node_port(
self,
input_image_number,
image_width,
image_height,
max_faces,
min_fps,
detector_size,
image,
) -> Union[None, Tuple[Any, int]]:
hard_state = get_face_recognition_module_resource_usage(
USAGE_CONFIG,
input_image_number=input_image_number,
image_width=image_width,
image_height=image_height,
max_faces=max_faces,
min_fps=min_fps,
detector_size=detector_size,
)
self.read_cluster_state()
sorted_state = self.sort_state()
for node in sorted_state:
future_sys_mem = (
CLUSTER_STATE[node[0]]["hard_state"]["SYSTEM_MEMORY"]
+ hard_state["SYSTEM_MEMORY"]
)
future_dev_mem = (
CLUSTER_STATE[node[0]]["hard_state"]["DEVICE_MEMORY"]
+ hard_state["DEVICE_MEMORY"]
)
future_node_cap = (
CLUSTER_STATE[node[0]]["hard_state"]["CONTAINER_CAPACITY"]
+ hard_state["CONTAINER_CAPACITY"]
)
if (
future_sys_mem < MAX_SYSTEM_MEMORY
and future_dev_mem < MAX_DEVICE_MEMORY
and future_node_cap < MAX_SCORE
):
CLUSTER_STATE[node[0]]["soft_state"].append(
{
"image": image,
"detector_size": detector_size,
"max_faces": max_faces,
"min_fps": min_fps,
}
)
CLUSTER_STATE[node[0]]["hard_state"] = hard_state
port = random.randint(
CLUSTER_STATE[node[0]]["min_port"],
CLUSTER_STATE[node[0]]["max_port"],
)
while port in USED_PORTS:
port = random.randint(
CLUSTER_STATE[node[0]]["min_port"],
CLUSTER_STATE[node[0]]["max_port"],
)
USED_PORTS.add(port)
if self.debug:
print("Future cluster state")
pprint.pprint(CLUSTER_STATE)
json_state = json.dumps(CLUSTER_STATE, indent=4)
with open(os.path.join(ABS_PATH, FULL_STATE), "w") as outfile:
outfile.write(json_state)
return node[0], port
else:
continue
if __name__ == "__main__":
agent = Agent()
node, port = agent.get_node_port(
input_image_number=8,
image_width=2560,
image_height=1440,
max_faces=5,
min_fps=12,
detector_size=640,
image="test",
)
print(node, port)

View file

@ -1,157 +0,0 @@
import asyncio
import configparser
import logging
from typing import List, Dict, Any, Optional, Tuple, Union
# ============================= Batch identification ==========================
import numpy as np
import copy
from detectors_stream_worker import DetectorsStreamWorker, DetectorsStreamWorkerManager
from utils.base64_convector import base64_2_vector
from face_identification import vector_identification
import os
CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.ini")
CONFIG = configparser.ConfigParser()
CONFIG.read(CONFIG_PATH)
CONF_DATA = dict(CONFIG.items("MANAGER"))
from db_tools import DataBaseManager
app_logger = logging.getLogger('APP')
def search_vectors_in_db(
lookupIds: List[str],
target, # id-шники требуемых векторов
) -> Union[str, bytes, List[List[float]]]:
manager = DataBaseManager(CONFIG_PATH)
lookupIds_vectors = manager.get_vectors(lookupIds, target=target)
return lookupIds_vectors
def batch_identification(
lookupIds: List[str],
faceVectorArchive: List[Dict[str, Any]],
recognitionThreshold: float,
):
"""
Функция поиска лиц из базы розыска в faceVectorArchive.
Args:
lookupVectors (List[str]): список id-шников векторов лиц из базы поиска людей,
с которыми необходимо сравнивать незнакомые лица. Формат:
[
de630dfa-158a-4d20-a903-ded922132124,
...
]
faceVectorArchive (List[Dict[str, Any]]): вектора лиц с видеокамер, которые необходимо сравнить
с людьми из базы поиска. Формат:
[
{
vectorId: de630dfa-158a-4d20-a903-ded922132124,
faceVector: base64-строка (закодированные 512 флотов)
}
...
]
recognitionThreshold (float): пороговое значение сходства, меньше которого лица будут считаться различными.
Return:
{
matches: [
{
faceVectorId: de630dfa-158a-4d20-a903-ded922132124, - id вектора из архива,
на котором нашлось совпадение
vectorId: de630dfa-158a-4d20-a903-ded922132124, - id вектора из базы розыска
recognitionConfidence: float - уровень уверенности (должно быть больше чем threshold,
чтобы детекция попала в совпадения)
},
...
]
}
"""
# Ищем вектора в базе розыска по lookupIds
lookupIds_vectors: Union[str, bytes, List[List[float]]] = search_vectors_in_db(
lookupIds, target="base_id"
)
# Формируем структуру базы розыска
lookupVectors: Dict[str, List[float]] = [
{"vectorId": name, "faceVector": vector}
for name, vector in zip(lookupIds, lookupIds_vectors)
]
# Раскодируем вектора из баз розыска
# Пройдёмся по всем базам розыска
faceVectorArchive = copy.deepcopy(faceVectorArchive)
for search_base in [lookupVectors, faceVectorArchive]:
# Пройдёмся по каждой записи
for row in search_base:
# Если вектор - строка или байты - раскодируем
vector = row.get("faceVector")
if isinstance(vector, (str, bytes)):
row["faceVector"] = base64_2_vector(vector, dtype=np.float32).tolist()
# Вызываем метод поиска
identification_result = vector_identification(
lookupVectors=lookupVectors,
faceVectorArchive=faceVectorArchive,
recognitionThreshold=recognitionThreshold,
)
# Сформируем результат
result = [
{
"vectorId": row["vectorId"],
"faceVectorId": row["detectionId"],
"recognitionConfidence": row["recognitionConfidence"],
}
for row in identification_result
]
return {"matches": result}
def subscribe_to_detections(
stream: str,
detectionFrequency: int,
recognitionThreshold: float,
searchBaseIds: List[str],
bboxStream: Optional[str] = None,
eventsStream: Optional[str] = None,
bboxThreshold: Optional[float] = None,
detectionThreshold: Optional[float] = None,
controlArea: Optional[List[Tuple[int, int]]] = None,
minFaceSize: Optional[float] = None,
maxFacesPerFrame: Optional[int] = None,
enableAttributes: bool = False
):
worker = DetectorsStreamWorker(
stream=stream,
detectionFrequency=detectionFrequency,
recognitionThreshold=recognitionThreshold,
searchBaseIds=searchBaseIds,
bboxStream=bboxStream,
eventsStream=eventsStream,
bboxThreshold=bboxThreshold,
detectionThreshold=detectionThreshold,
controlArea=controlArea,
minFaceSize=minFaceSize,
maxFacesPerFrame=maxFacesPerFrame,
enableAttributes=enableAttributes,
)
# Получим асинхронный цикл
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
app_logger.debug('worker create')
try:
loop.run_until_complete(worker.create())
except Exception as e:
app_logger.debug(f'worker close with error: {e}')
loop.run_until_complete(worker.close())
raise e
else:
app_logger.debug('worker run')
DetectorsStreamWorkerManager.add(worker)