diff --git a/.gitignore b/.gitignore index b504c84..e1a259e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,8 @@ simar-key -config.ini \ No newline at end of file +config.ini +.venv +*.log +__pycache__/ +*.pyc +*.egg-info/ +.pytest_cache/ \ No newline at end of file diff --git a/master-node/src/app.py b/master-node/src/app.py index fc5acaf..8b9afc3 100644 --- a/master-node/src/app.py +++ b/master-node/src/app.py @@ -1,8 +1,6 @@ import os -import logging import configparser import datetime -import json from flask import ( Flask, @@ -21,45 +19,39 @@ from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash ABS_PATH = os.path.dirname(os.path.realpath(__file__)) -UPLOAD_FOLDER = os.path.join(ABS_PATH, "tmp") -ALLOWED_EXTENSIONS = {"csv", "mp4"} MAX_CONTENT_PATH = 1000000000 CONFIG = os.path.join(ABS_PATH, "config.ini") SCHEDULER_API_ENABLED = True SHOW_LOG = True from logger import LoggerFactory -# Настроим фабрику логгов + + LoggerFactory.setting( log_level=os.getenv("LOG_LEVEL", "INFO"), - log_format='[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s', + log_format="[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s", show=True, ) from cluster import Cluster -from manager_utils import batch_identification, subscribe_to_detections from cluster_state import AutoState app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates")) -app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED app.config["SESSION_TYPE"] = "filesystem" auth = HTTPBasicAuth() scheduler = APScheduler() -app_logger = LoggerFactory.get_logger('APP') +app_logger = LoggerFactory.get_logger("APP") AUTO = AutoState(debug=False) -@scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900) -def cluster_state(): - AUTO.check_cluster_state() - # app_logger.debug("Finished with auto cluster state") - - - +# @scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900) +# def cluster_state(): +# AUTO.check_cluster_state() +# # app_logger.debug("Finished with auto cluster state") def get_config() -> dict: @@ -83,71 +75,19 @@ def verify_password(username, password): return username -def allowed_file(filename): - return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS - - -@app.route("/") +# curl -u : -d "playbook=ping_workers&args=hosts=workers" -v http://localhost:5010/api/v1.0/run_ansible +@app.route("/api/v1.0/run_ansible", methods=["POST"]) @auth.login_required -def home(): - return render_template("index.html") - - -@app.route("/faces") -def faces(): - return render_template("faces.html") - - -@app.route("/cluster", methods=["GET", "POST"]) -def cluster_activity(): - if request.method == "POST": - if request.form.get("get_state") == "Get state": - try: - cluster = Cluster(CONFIG, ABS_PATH) - command = ( - "ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml" - ) - lines = cluster.run_ansible_command(command, use_configs=True) - state = "".join(lines).replace("\\n", "\n") - flash(state, category="success") - except Exception as e: - flash(f"Get state failed with: {e}", category="error") - app_logger.error(f"Get state failed with: {e}") - return redirect(request.url) - - if request.method == "GET": - return render_template("cluster.html") - - -@app.route("/faces/batch", methods=["GET", "POST"]) -def batch(): - if request.method == "POST": - lookupIds = request.form.get("lookupIds") - faceVectorArchive = request.form.get("faceVectorArchive") - recognitionThreshold = request.form.get("recognitionThreshold") - try: - matches = batch_identification( - lookupIds, faceVectorArchive, recognitionThreshold - ) - flash( - f"Batch identification match {matches}", - category="success", - ) - except Exception as e: - flash(f"Batch identification failed with error: {e}", category="error") - app_logger.error(f"Batch identification failed with error: {e}") - return redirect(request.url) - if request.method == "GET": - return render_template("batch.html") - -# curl -v -u : http://localhost:5010/faces/api/v1.0/get_cluster_state -@app.route("/api/faces/v1.0/get-cluster-state", methods=["GET"]) -@auth.login_required -def get_cluster_activity(): +def run_ansible(): try: - cluster = Cluster(CONFIG, ABS_PATH) - command = "ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml" - state = cluster.run_ansible_command(command, use_configs=True) + cluster = Cluster(CONFIG, ABS_PATH, use_key=False) + playbook = request.form.get("playbook") + playbook_args = request.form.get("args") + if playbook and playbook_args: + command = f"""ansible-playbook --extra-vars "{playbook_args}" playbooks/{playbook}.yml""" + state = cluster.run_ansible_command(command, use_configs=True) + else: + state = "No playbook and playbook args given" time = datetime.datetime.now() data = { "message": state, @@ -156,47 +96,11 @@ def get_cluster_activity(): } return make_response(jsonify(data), 200) except Exception as e: - data = {"message": f"Get state failed with error: {e}", "code": "FAILED"} + data = {"message": f"Run ansible failed with error: {e}", "code": "FAILED"} app_logger.error(data) return make_response(jsonify(data), 400) -# curl -v -u : -X POST -H "Content-type: application/json" -d @test_input.json http://localhost:5010/faces/api/v1.0/batch -@app.route("/api/faces/v1.0/batch", methods=["POST"]) -@auth.login_required -def batch_id(): - content_type = request.headers.get("Content-Type") - if content_type == "application/json": - if request.is_json: - data = request.json - lookupIds = data.get("lookupIds") - faceVectorArchive = data.get("faceVectorArchive") - recognitionThreshold = data.get("recognitionThreshold") - try: - matches = batch_identification( - lookupIds, faceVectorArchive, recognitionThreshold - ) - time = datetime.datetime.now() - data = { - "message": matches, - "code": "SUCCESS", - "insertTimestamp": time, - } - return make_response(jsonify(data), 200) - except Exception as e: - data = { - "message": f"Batch identification failed with error: {e}", - "code": "FAILED", - } - app_logger.error(data) - return make_response(jsonify(data), 400) - else: - data = {"message": f"Invalid json: {request}", "code": "FAILED"} - return make_response(jsonify(data), 400) - else: - data = {"message": f"Wrong content type: {content_type}", "code": "FAILED"} - return make_response(jsonify(data), 400) - def run_app(): port = int(os.environ.get("PORT", 5010)) app_logger.info(ABS_PATH) diff --git a/master-node/src/cluster.py b/master-node/src/cluster.py index c28f693..c3b2658 100644 --- a/master-node/src/cluster.py +++ b/master-node/src/cluster.py @@ -7,28 +7,33 @@ from tabulate import tabulate class Cluster: - def __init__(self, config_path: str, abs_path: str) -> None: + def __init__(self, config_path: str, abs_path: str, use_key=False) -> None: self.abs_path = abs_path self.config = configparser.ConfigParser() self.config.read(config_path) self.auth_data = dict(self.config.items("CLUSTER")) self.ssh = paramiko.SSHClient() + self.use_key = use_key def connect(self) -> None: - k = paramiko.RSAKey.from_private_key_file( - os.path.join(self.abs_path, self.auth_data["key"]) - ) - host = self.auth_data["host"] - port = self.auth_data["port"] self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - # self.ssh.load_system_host_keys() - # self.ssh.get_host_keys().add(host, "ssh-rsa", k) - self.ssh.connect( - hostname=self.auth_data["host"], - username=self.auth_data["user"], - pkey=k, - port=self.auth_data["port"], - ) + if self.use_key: + k = paramiko.RSAKey.from_private_key_file( + os.path.join(self.abs_path, self.auth_data["key"]) + ) + self.ssh.connect( + hostname=self.auth_data["host"], + username=self.auth_data["user"], + pkey=k, + port=self.auth_data["port"], + ) + else: + self.ssh.connect( + hostname=self.auth_data["host"], + username=self.auth_data["user"], + password=self.auth_data["password"], + port=self.auth_data["port"], + ) def get_state(self) -> list: self.connect() @@ -47,7 +52,8 @@ class Cluster: def run_ansible_command(self, command: str, use_configs=False) -> list: self.connect() if use_configs: - command = "cd sinapse-configs/nodes/ansible && " + command + command = "cd devops-configs/ansible && " + command + print(command) try: ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(command) ssh_stdout.channel.set_combine_stderr(True) diff --git a/master-node/src/cluster_state.py b/master-node/src/cluster_state.py index 7876214..d6321cb 100644 --- a/master-node/src/cluster_state.py +++ b/master-node/src/cluster_state.py @@ -5,7 +5,26 @@ import string import time from cluster import Cluster -from manager_agent import CLUSTER_STATE, NODES + +HARD_STATE = { + "SYSTEM_MEMORY": 0, + "DEVICE_MEMORY": 0, + "CONTAINER_CAPACITY": 0, + "CPU_USAGE": None, + "DEVICE_USAGE": None, +} + +CLUSTER_STATE = { + "1-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 1024, + "max_port": 1224, + }, +} + +NODES = 1 ABS_PATH = os.path.dirname(os.path.realpath(__file__)) CONFIG = os.path.join(ABS_PATH, "config.ini") diff --git a/master-node/src/entrypoint.sh b/master-node/src/entrypoint.sh new file mode 100644 index 0000000..d42c23c --- /dev/null +++ b/master-node/src/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/bash +mkdir -p /logs +touch /logs/error.log +touch /logs/access.log +cd src +# exec gunicorn --bind 0.0.0.0:5010 -c docker-gunicorn.conf.py app:app +ls -lash +exec gunicorn --bind 0.0.0.0:5010 app:app \ No newline at end of file diff --git a/master-node/src/load_balancer.py b/master-node/src/load_balancer.py deleted file mode 100644 index f31ebb8..0000000 --- a/master-node/src/load_balancer.py +++ /dev/null @@ -1,547 +0,0 @@ -import argparse -import errno -import os -import select -import socket -import sys - -""" -Есть горячие и холодные ноды. -При старте работы на 6 горячих нод уже запущены 6 контейнеров по 1 на каждой. На холодных установлен только образ, запущенных нет. -Далее берём максимум в 12 контейнеров на ноде. -При превышении запросами текущего количества работающих контейнеров стартуют контейнеры на горячих нодах до 6 контейнеров на каждой ноде (36). -Далее при увеличении нагрузки включаются холодные ноды, и также до 6 контейнеров на ноде (30). -Далее по аналогии увеличение контейнеров на горячих до 12 на ноде (72), и для холодных (60). -При постоянном потоке запросов превышающим кол-во работающих контейнеров (132) отбрасываем сообщения, что кластер забит. -Если нагрузка спадает, то на горячих нодах контейнеры работают ещё в течение 10 минут, далее включаются до 1 на ноде. На холодных - сразу всё отключается до 0. - -Реализовано это всё через функции постоянного опроса стейта кластера, сохранения стейта и счётчиков работающих контейнеров с меткой модуля -""" -""" -СТАТИЧЕСКАЯ БАЛАНСИРОВКА -11 нод в кластере: 6 горячих, 5 холодных. -Оценить влияние кол-ва лиц на предельно допустимые значения нагрузки. -МАКС КОНТЕЙНЕРОВ = 176 TODO - const value - find max values - ? -МАКС ПО ТИПАМ (1-16, 2-14, 3-12, 4-10) TODO - const value - find max values - ? -Добавить черный список контейнеров - ? - VS -Фильтрация параметров - ? - -0. 6 контейнеров на горячих нодах 1:1, 0 контейнеров на холодных нодах -1. Приемка запроса на подписку от грады -2. Определение типа детектора (4 типа: 1-2-3-4) -3. Определение текущей загрузки горячих нод (прямое исполнение команды на мастере) -4. Если не достигли половины предела (6 контейнеров * 6 нод = 36), то используем текущие, -запускаем новые с другим типом детектора (аргумент через ENV) -5. По достижению пол-предела в 36 контейнеров начинается загрузка холодных нод по аналогии с п. 4 (5 * 6 = 30) -6. По достижению пол-предела в 36 + 30 = 66 контейнеров - старт загрузки горячих нод по пункту 4 -7. По аналогии с холодными нодами до 132 (при 1 типе), убывание максимума обратно пропорционально -кол-во запущенных модулей с мощным детектором -""" - - -class LoadBalancer: - sendbuffer = 4096 - maxbuffersize = 2**16 - maxservers = 132 - - def __init__( - self, - host="0.0.0.0", - port=80, - targets=[], - fallback=None, - check_time=None, - debug=False, - family=socket.AF_INET, - typ=socket.SOCK_STREAM, - ): - assert isinstance(host, str), ValueError("Expected string as host-argument") - assert isinstance(port, int), ValueError("Expected int as port-argument") - assert isinstance(targets, list), ValueError( - "Expected list as targets-arguments" - ) - assert isinstance(fallback, tuple) or (fallback is None), ValueError( - "Expected tuple or None as fallback-argument" - ) - assert ( - isinstance(check_time, int) - or isinstance(check_time, float) - or (check_time is None) - ), ValueError("Expected int,long or float as check_time-argument") - assert check_time >= 0 or (check_time is None), ValueError( - "Expected positive integer as check_time-argument" - ) - assert isinstance(debug, bool), ValueError("Expected bool as debug-argument") - for addr in targets: - assert len(addr) == 2, ValueError( - "Each address in targets needs to be a tuple of length 2" - ) - assert isinstance(addr[0], str), ValueError( - "Each address in targets needs to have a str at index 0" - ) - assert isinstance(addr[1], int), ValueError( - "Each address in targets needs to have a int at index 1" - ) - assert addr[1] > 0, ValueError( - "Each address in targets needs to have a integer larger 0 as port" - ) - self.host = host - self.port = port - self.targets = targets - self.fallback = fallback - self.check_time = check_time - self.debug = debug - self.family = family - self.type = typ - self.o2i = {} - self.i2o = {} - self.s2b = {} - self.o2t = {} - self.t2n = {} - self.running = False - self.bind_and_listen() - - def bind_and_listen(self): - if self.debug: - print("binding...") - self.listen_s = socket.socket(self.family, self.type, 0) - self.listen_s.bind((self.host, self.port)) - if self.debug: - print("bound to: {host}:{port}".format(host=self.host, port=self.port)) - try: - self.listen_s.listen(3) - except: - self.listen_s.listen(1) - - def mainloop(self): - if self.debug: - print("entering mainloop.") - self.running = True - try: - while self.running: - checkread = checkex = ( - [self.listen_s] + self.i2o.keys() + self.o2i.keys() - ) - checkwrite = filter( - None, [s for s in self.s2b.keys() if len(self.s2b[s]) > 0] - ) - toread, towrite, exceptional = select.select( - checkread, checkwrite, checkex, self.check_time - ) - if self.debug: - print("data avaible on: ", toread) - print("buffer free on: ", towrite) - print("errors on: ", exceptional) - if len(toread) == 0 and len(towrite) == 0 and len(exceptional) == 0: - continue - for s in exceptional: - if s is self.listen_s: - if self.debug: - print("error in listening socket!") - self.running = False - raise RuntimeError("Error in listening socket!") - elif s in self.o2i: - if self.debug: - print("error in client connection: closing socket.") - self.close_s(s) - else: - s2c = s - c2s = self.i2o[s2c] - if c2s in self.o2t: - old_peer = self.o2t[c2s] - if old_peer == self.fallback or (self.fallback is None): - old_peer = None - else: - old_peer = None - if old_peer is None: - if self.debug: - print("fallback not possible, closing s") - self.close_s(c2s) - else: - if self.debug: - print( - "error in connection to normal server, instead connecting to fallback-server." - "" - ) - if self.debug: - print("unregistering old peer") - try: - s2c.shutdown(socket.SHUT_RDWR) - except: - pass - try: - s2c.close() - except: - pass - if s2c in self.s2b: - old_buff = self.s2b[s2c] - if self.debug: - print( - "redirecting {n} bytes to fallback server".format( - n=len(old_buff) - ) - ) - del self.s2b[s2c] - else: - old_buff = "" - if s2c in self.i2o: - del self.i2o[s2c] - if c2s in self.o2t: - t = self.o2t[c2s] - if t in self.t2n: - self.t2n[t] = max(0, self.t2n[t] - 1) - self.o2t[c2s] = self.fallback - peer = socket.socket(self.family, self.type, 0) - peer.setblocking(0) - self.s2b[peer] = old_buff - self.o2i[c2s] = peer - self.i2o[peer] = c2s - self.o2t[c2s] = self.fallback - try: - err = peer.connect_ex(self.fallback) - except Exception as e: - try: - err = e.args[0] - except: - err = None - if ( - err == errno.EWOULDBLOCK - or err == errno.WSAEWOULDBLOCK - ): - pass - else: - if self.debug: - print("error during connect to fallback:", e) - self.close_s(c2s) - continue - if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK: - pass - else: - if self.debug: - print( - "error during connet to fallback:", - errno.errorcode[err], - ) - self.close_s(c2s) - for s in towrite: - if s not in self.s2b: - continue - if len(self.s2b[s]) < self.sendbuffer: - tosend = self.s2b[s] - else: - tosend = self.s2b[s][: self.sendbuffer] - if self.debug: - print( - "sending {n} bytes (left: {t} bytes)".format( - n=len(tosend), t=len(self.s2b[s]) - len(tosend) - ) - ) - try: - sent = s.send(tosend) - except socket.error as e: - if self.debug: - print("error writing buffer:", e) - self.close_s(s) - continue - if self.debug: - print("sent {n} bytes.".format(n=sent)) - if sent >= len(self.s2b[s]): - self.s2b[s] = "" - self.s2b[s] = self.s2b[s][sent:] - for s in toread: - if s is self.listen_s: - if self.debug: - print("got request") - if len(self.targets) == 0: - if self.debug: - print("using fallback server") - target = self.fallback - else: - c = 9999999 - target = None - for ta in targets: - if ta not in self.t2n: - self.t2n[ta] = 0 - n = self.t2n[ta] - if n < c: - c = n - target = ta - if target is None: - if self.debug: - print("cannot find a target!") - continue - if target in self.t2n: - self.t2n[target] += 1 - else: - self.t2n[target] = 1 - if self.debug: - print("target is:", target) - new_s, addr = self.listen_s.accept() - new_s.setblocking(0) - peer = socket.socket(self.family, self.type, 0) - peer.setblocking(0) - self.s2b[new_s] = "" - self.s2b[peer] = "" - self.o2i[new_s] = peer - self.i2o[peer] = new_s - self.o2t[new_s] = target - try: - err = peer.connect_ex(target) - except Exception as e: - try: - err = e.args[0] - except: - err = None - if err == errno.EWOULDBLOCK or err == errno.WSAEWOULDBLOCK: - pass - else: - if self.debug: - print("error during connect to target:", e) - self.close_s(new_s) - continue - if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK: - pass - else: - if self.debug: - print( - "error during connet to target:", - errno.errorcode[err], - ) - self.close_s(new_s) - else: - if s in self.i2o: - p = self.i2o[s] - elif s in self.o2i: - p = self.o2i[s] - else: - if self.debug: - print("a socket has no peer registered, closing it.") - self.close_s(s) - continue - try: - peerbufferlength = len(self.s2b[p]) - except KeyError: - peerbufferlength = 0 - self.s2b[p] = "" - if peerbufferlength < self.maxbuffersize: - maxread = self.maxbuffersize - peerbufferlength - try: - data = s.recv(maxread) - except socket.error as e: - err = e.args[0] - if err == errno.EAGAIN or err == errno.EWOULDBLOCK: - continue - else: - if self.debug: - print("error while receiving:", e) - self.close_s(s) - continue - if self.debug: - print("reveived {n} bytes.".format(n=len(data))) - if len(data) == 0: - if self.debug: - print("connection closed.") - self.close_s(s) - else: - self.s2b[p] += data - else: - continue - finally: - if self.debug: - print("closing all connections...") - try: - self.listen_s.close() - except: - pass - for s in self.o2i.keys() + self.i2o.keys(): - self.close_s(s) - - def close_s(self, s): - try: - s.shutdown(socket.SHUT_RDWR) - except: - pass - try: - s.close() - except: - pass - if s in self.s2b: - del self.s2b[s] - if s in self.o2t: - t = self.o2t[s] - del self.o2t[s] - if t in self.t2n: - self.t2n[t] = max(self.t2n[t] - 1, 0) - if s in self.o2i: - p = self.o2i[s] - del self.o2i[s] - self.close_s(p) - if s in self.i2o: - p = self.i2o[s] - del self.i2o[s] - self.close_s(p) - - def add_target(self, addr): - assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument") - assert len(addr) == 2, ValueError( - "Expected a tuple of length 2 as addr-argument" - ) - assert isinstance(addr[0], str), ValueError( - "Expected addr-argument to have a string at index 0" - ) - assert isinstance(addr[1], str), ValueError( - "Expected addr-argument to have a integer at index 1" - ) - if addr in self.targets: - raise ValueError("Target already registered!") - else: - self.targets.append(addr) - - def remove_target(self, addr): - assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument") - assert len(addr) == 2, ValueError( - "Expected a tuple of length 2 as addr-argument" - ) - assert isinstance(addr[0], str), ValueError( - "Expected addr-argument to have a string at index 0" - ) - assert isinstance(addr[1], str), ValueError( - "Expected addr-argument to have a integer at index 1" - ) - if addr in self.targets: - self.targets.remove(addr) - else: - raise ValueError("Target not found!") - - def stop(self): - if not self.running: - raise RuntimeError("Server not running!") - self.running = False - self.check_time = 0 - - -def load_from_file(path, ignore_errors=False): - targets = [] - with open(path, "rU") as f: - for line in f: - try: - if line.startswith("#"): - continue - if line.count(":") != 1: - raise SyntaxError( - "Error in line '{l}': expected exactly one ':'!".format(l=line) - ) - host, port = line.split(":") - try: - port = int(port) - except ValueError: - raise SyntaxError( - "Error in line '{l}': cannot convert port to int!".format( - l=line - ) - ) - if len(host) == 0: - raise SyntaxError( - "Error in line '{l}': invalid host format!".format(l=line) - ) - targets.append((host, port)) - except SyntaxError: - if not ignore_errors: - raise - return targets - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="A Load-Balancer") - parser.add_argument("host", help="host/ip to bind to") - parser.add_argument("port", type=int, help="port to bind to") - parser.add_argument( - "-d", action="store_true", dest="debug", help="enable debug-mode" - ) - parser.add_argument( - "-p", - action="store", - dest="path", - help="load target list from file", - required=False, - default=None, - ) - parser.add_argument( - "-f", - action="store", - dest="fallback", - help="target to relay connections to if no server are aviable", - required=False, - default=None, - ) - parser.add_argument( - "-t", - action="store", - dest="targets", - help="targets to spread connections to", - required=False, - default=None, - nargs="+", - ) - ns = parser.parse_args() - if ns.targets is None: - targets = [] - else: - targets = [] - for ta in ns.targets: - if ta.count(":") != 1: - print( - "SyntaxError in command-line target-list: expected exactly one ':'!" - ) - sys.exit(1) - host, port = ta.split(":") - if len(host) == 0: - print("SyntaxError in command-line target-list: invalid host!") - sys.exit(1) - try: - port = int(port) - if port <= 0: - raise ValueError - except ValueError: - print("SyntaxError in command-line target-list: invalid port!") - sys.exit(1) - targets.append((host, port)) - if ns.fallback is not None: - if ns.fallback.count(":") != 1: - print("SyntaxError in fallback-argument: expected exactly one ':'!") - sys.exit(1) - host, port = ns.fallback.split(":") - if len(host) == 0: - print("SyntaxError in fallback-argument: invalid host!") - sys.exit(1) - try: - port = int(port) - if port <= 0: - raise ValueError - except ValueError: - print("SyntaxError in fallback-argument: invalid port!") - sys.exit(1) - fallback = (host, port) - else: - fallback = None - if ns.path is None: - pass - elif not os.path.isfile(ns.path): - print("Error: File not found!") - sys.exit(1) - else: - targets += load_from_file(ns.path, False) - if len(targets) == 0: - print("Error: no targets found!") - sys.exit(1) - lb = LoadBalancer(ns.host, ns.port, targets, fallback, None, ns.debug) - try: - lb.mainloop() - except (KeyboardInterrupt, SystemExit) as e: - pass - finally: - try: - lb.stop() - except RuntimeError as e: - pass diff --git a/master-node/src/manager_agent.py b/master-node/src/manager_agent.py deleted file mode 100644 index 89429b6..0000000 --- a/master-node/src/manager_agent.py +++ /dev/null @@ -1,228 +0,0 @@ -import json -import os -import pprint -import random -import string -from typing import Union, Tuple, Any - -from module_size_calculator import get_face_recognition_module_resource_usage -from cluster import Cluster -from utils.patterns import Singleton - -MAX_SYSTEM_MEMORY = 3000 # Mb -MAX_DEVICE_MEMORY = 7500 # Mb -MAX_SCORE = 110 # % - -# MIN_PORT = 1000 -# MAX_PORT = 2000 -USED_PORTS = set() - -NODES = 11 - -ABS_PATH = os.path.dirname(os.path.realpath(__file__)) -CONFIG = os.path.join(ABS_PATH, "config.ini") -USAGE_CONFIG = os.path.join(ABS_PATH, "docker_module_resource_usage.yaml") -RUNNING_STATE = "cluster_state_running_only.json" -FULL_STATE = "cluster_state_full.json" -SOFT_STATE = {"image": "", "detector_size": "", "max_faces": "", "min_fps": ""} - -HARD_STATE = { - "SYSTEM_MEMORY": 0, - "DEVICE_MEMORY": 0, - "CONTAINER_CAPACITY": 0, - "CPU_USAGE": None, - "DEVICE_USAGE": None, -} - -CLUSTER_STATE = { - # "1-node": { - # "soft_state": [], - # "hard_state": HARD_STATE, - # "running": 0, - # "min_port": 1024, - # "max_port": 1224, - # }, - "2-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 2024, - "max_port": 2224, - }, - "3-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 3024, - "max_port": 3224, - }, - "4-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 4024, - "max_port": 4224, - }, - "5-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 5024, - "max_port": 5224, - }, - "6-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 6024, - "max_port": 6224, - }, - "7-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 7024, - "max_port": 7224, - }, - "8-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 8024, - "max_port": 8224, - }, - "9-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 9024, - "max_port": 9224, - }, - "10-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 10024, - "max_port": 10224, - }, - "11-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 11024, - "max_port": 11224, - }, - "12-node": { - "soft_state": [], - "hard_state": HARD_STATE, - "running": 0, - "min_port": 12024, - "max_port": 12224, - }, -} - - -class Agent(Singleton): - def __init__(self, debug=False) -> None: - self.cluster = Cluster(CONFIG, ABS_PATH) - self.debug = debug - - def read_cluster_state(self) -> None: - try: - with open(os.path.join(ABS_PATH, RUNNING_STATE), "r") as f: - running_state = json.load(f) - except FileNotFoundError as e: - raise FileNotFoundError(f"Cluster state is not available by {RUNNING_STATE}") - for node in running_state: - if node not in CLUSTER_STATE: - continue - CLUSTER_STATE[node]["running"] = running_state[node]["running"] - - def sort_state(self) -> list: - sorted_state_list = sorted(CLUSTER_STATE.items(), key=lambda x: x[1]["running"]) - if self.debug: - print("Sorted cluster state") - pprint.pprint(sorted_state_list) - return sorted_state_list - - def get_node_port( - self, - input_image_number, - image_width, - image_height, - max_faces, - min_fps, - detector_size, - image, - ) -> Union[None, Tuple[Any, int]]: - hard_state = get_face_recognition_module_resource_usage( - USAGE_CONFIG, - input_image_number=input_image_number, - image_width=image_width, - image_height=image_height, - max_faces=max_faces, - min_fps=min_fps, - detector_size=detector_size, - ) - self.read_cluster_state() - sorted_state = self.sort_state() - for node in sorted_state: - future_sys_mem = ( - CLUSTER_STATE[node[0]]["hard_state"]["SYSTEM_MEMORY"] - + hard_state["SYSTEM_MEMORY"] - ) - future_dev_mem = ( - CLUSTER_STATE[node[0]]["hard_state"]["DEVICE_MEMORY"] - + hard_state["DEVICE_MEMORY"] - ) - future_node_cap = ( - CLUSTER_STATE[node[0]]["hard_state"]["CONTAINER_CAPACITY"] - + hard_state["CONTAINER_CAPACITY"] - ) - if ( - future_sys_mem < MAX_SYSTEM_MEMORY - and future_dev_mem < MAX_DEVICE_MEMORY - and future_node_cap < MAX_SCORE - ): - CLUSTER_STATE[node[0]]["soft_state"].append( - { - "image": image, - "detector_size": detector_size, - "max_faces": max_faces, - "min_fps": min_fps, - } - ) - CLUSTER_STATE[node[0]]["hard_state"] = hard_state - port = random.randint( - CLUSTER_STATE[node[0]]["min_port"], - CLUSTER_STATE[node[0]]["max_port"], - ) - while port in USED_PORTS: - port = random.randint( - CLUSTER_STATE[node[0]]["min_port"], - CLUSTER_STATE[node[0]]["max_port"], - ) - USED_PORTS.add(port) - if self.debug: - print("Future cluster state") - pprint.pprint(CLUSTER_STATE) - json_state = json.dumps(CLUSTER_STATE, indent=4) - with open(os.path.join(ABS_PATH, FULL_STATE), "w") as outfile: - outfile.write(json_state) - return node[0], port - else: - continue - - -if __name__ == "__main__": - agent = Agent() - node, port = agent.get_node_port( - input_image_number=8, - image_width=2560, - image_height=1440, - max_faces=5, - min_fps=12, - detector_size=640, - image="test", - ) - print(node, port) diff --git a/master-node/src/manager_utils.py b/master-node/src/manager_utils.py deleted file mode 100644 index c5eb31e..0000000 --- a/master-node/src/manager_utils.py +++ /dev/null @@ -1,157 +0,0 @@ -import asyncio -import configparser -import logging -from typing import List, Dict, Any, Optional, Tuple, Union - -# ============================= Batch identification ========================== - -import numpy as np -import copy - -from detectors_stream_worker import DetectorsStreamWorker, DetectorsStreamWorkerManager -from utils.base64_convector import base64_2_vector -from face_identification import vector_identification -import os - -CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.ini") -CONFIG = configparser.ConfigParser() -CONFIG.read(CONFIG_PATH) -CONF_DATA = dict(CONFIG.items("MANAGER")) - -from db_tools import DataBaseManager - -app_logger = logging.getLogger('APP') - -def search_vectors_in_db( - lookupIds: List[str], - target, # id-шники требуемых векторов -) -> Union[str, bytes, List[List[float]]]: - manager = DataBaseManager(CONFIG_PATH) - lookupIds_vectors = manager.get_vectors(lookupIds, target=target) - return lookupIds_vectors - - -def batch_identification( - lookupIds: List[str], - faceVectorArchive: List[Dict[str, Any]], - recognitionThreshold: float, -): - """ - Функция поиска лиц из базы розыска в faceVectorArchive. - - Args: - lookupVectors (List[str]): список id-шников векторов лиц из базы поиска людей, - с которыми необходимо сравнивать незнакомые лица. Формат: - [ - “de630dfa-158a-4d20-a903-ded922132124”, - ... - ] - faceVectorArchive (List[Dict[str, Any]]): вектора лиц с видеокамер, которые необходимо сравнить - с людьми из базы поиска. Формат: - [ - { - vectorId: “de630dfa-158a-4d20-a903-ded922132124”, - faceVector: base64-строка (закодированные 512 флотов) - } - ... - ] - recognitionThreshold (float): пороговое значение сходства, меньше которого лица будут считаться различными. - - Return: - { - matches: [ - { - faceVectorId: “de630dfa-158a-4d20-a903-ded922132124”, - id вектора из архива, - на котором нашлось совпадение - vectorId: “de630dfa-158a-4d20-a903-ded922132124”, - id вектора из базы розыска - recognitionConfidence: float - уровень уверенности (должно быть больше чем threshold, - чтобы детекция попала в совпадения) - }, - ... - ] - } - """ - # Ищем вектора в базе розыска по lookupIds - lookupIds_vectors: Union[str, bytes, List[List[float]]] = search_vectors_in_db( - lookupIds, target="base_id" - ) - # Формируем структуру базы розыска - lookupVectors: Dict[str, List[float]] = [ - {"vectorId": name, "faceVector": vector} - for name, vector in zip(lookupIds, lookupIds_vectors) - ] - - # Раскодируем вектора из баз розыска - # Пройдёмся по всем базам розыска - faceVectorArchive = copy.deepcopy(faceVectorArchive) - for search_base in [lookupVectors, faceVectorArchive]: - # Пройдёмся по каждой записи - for row in search_base: - # Если вектор - строка или байты - раскодируем - vector = row.get("faceVector") - if isinstance(vector, (str, bytes)): - row["faceVector"] = base64_2_vector(vector, dtype=np.float32).tolist() - - # Вызываем метод поиска - identification_result = vector_identification( - lookupVectors=lookupVectors, - faceVectorArchive=faceVectorArchive, - recognitionThreshold=recognitionThreshold, - ) - - # Сформируем результат - result = [ - { - "vectorId": row["vectorId"], - "faceVectorId": row["detectionId"], - "recognitionConfidence": row["recognitionConfidence"], - } - for row in identification_result - ] - - return {"matches": result} - - -def subscribe_to_detections( - stream: str, - detectionFrequency: int, - recognitionThreshold: float, - searchBaseIds: List[str], - bboxStream: Optional[str] = None, - eventsStream: Optional[str] = None, - bboxThreshold: Optional[float] = None, - detectionThreshold: Optional[float] = None, - controlArea: Optional[List[Tuple[int, int]]] = None, - minFaceSize: Optional[float] = None, - maxFacesPerFrame: Optional[int] = None, - enableAttributes: bool = False -): - worker = DetectorsStreamWorker( - stream=stream, - detectionFrequency=detectionFrequency, - recognitionThreshold=recognitionThreshold, - searchBaseIds=searchBaseIds, - bboxStream=bboxStream, - eventsStream=eventsStream, - bboxThreshold=bboxThreshold, - detectionThreshold=detectionThreshold, - controlArea=controlArea, - minFaceSize=minFaceSize, - maxFacesPerFrame=maxFacesPerFrame, - enableAttributes=enableAttributes, - ) - # Получим асинхронный цикл - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - app_logger.debug('worker create') - try: - loop.run_until_complete(worker.create()) - except Exception as e: - app_logger.debug(f'worker close with error: {e}') - loop.run_until_complete(worker.close()) - raise e - else: - app_logger.debug('worker run') - DetectorsStreamWorkerManager.add(worker)