diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b504c84 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +simar-key +config.ini \ No newline at end of file diff --git a/master-node/Dockerfile b/master-node/Dockerfile new file mode 100644 index 0000000..ac32a74 --- /dev/null +++ b/master-node/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.12-slim-bookworm:latest +COPY ./docker_requirements.txt /requirements.txt +RUN pip install -r /requirements.txt +COPY . /app +WORKDIR /app +ENTRYPOINT ["python"] +CMD ["src/app.py"] +# ENTRYPOINT ["sh", "src/entrypoint.sh"] # WSGI - gunicorn prod run \ No newline at end of file diff --git a/master-node/docker_requirements.txt b/master-node/docker_requirements.txt new file mode 100644 index 0000000..0423579 --- /dev/null +++ b/master-node/docker_requirements.txt @@ -0,0 +1,15 @@ +gunicorn == 21.2.0 +requests == 2.26.0 +httpx == 0.27.0 +requests-oauthlib == 1.3.0 +Flask == 3.0.2 +Flask-HTTPAuth == 4.8.0 +Flask-APScheduler == 1.13.1 +pandas == 1.4.2 +Werkzeug == 3.0.1 +paramiko == 3.4.0 +tabulate == 0.9.0 +psycopg2-binary == 2.9.9 +pydantic==2.6.3 +PyYAML == 6.0 +websockets==12.0 \ No newline at end of file diff --git a/master-node/requirements.txt b/master-node/requirements.txt new file mode 100644 index 0000000..9f09707 --- /dev/null +++ b/master-node/requirements.txt @@ -0,0 +1,17 @@ +gunicorn == 21.2.0 +requests == 2.26.0 +httpx == 0.27.0 +requests-oauthlib == 1.3.0 +Flask == 3.0.2 +Flask-HTTPAuth == 4.8.0 +Flask-APScheduler == 1.13.1 +numpy == 1.26.2 +pandas == 1.4.2 +Werkzeug == 3.0.1 +opencv-python-headless == 4.9.0.80 +paramiko == 3.4.0 +tabulate == 0.9.0 +psycopg2 == 2.9.9 +pydantic==2.6.3 +PyYAML == 6.0 +websockets==12.0 \ No newline at end of file diff --git a/master-node/src/app.py b/master-node/src/app.py new file mode 100644 index 0000000..fc5acaf --- /dev/null +++ b/master-node/src/app.py @@ -0,0 +1,214 @@ +import os +import logging +import configparser +import datetime +import json + +from flask import ( + Flask, + flash, + request, + redirect, + url_for, + render_template, + jsonify, + make_response, +) +from flask_httpauth import HTTPBasicAuth +from flask_apscheduler import APScheduler +from pydantic import ValidationError +from werkzeug.utils import secure_filename +from werkzeug.security import generate_password_hash, check_password_hash + +ABS_PATH = os.path.dirname(os.path.realpath(__file__)) +UPLOAD_FOLDER = os.path.join(ABS_PATH, "tmp") +ALLOWED_EXTENSIONS = {"csv", "mp4"} +MAX_CONTENT_PATH = 1000000000 +CONFIG = os.path.join(ABS_PATH, "config.ini") +SCHEDULER_API_ENABLED = True +SHOW_LOG = True + +from logger import LoggerFactory +# Настроим фабрику логгов +LoggerFactory.setting( + log_level=os.getenv("LOG_LEVEL", "INFO"), + log_format='[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s', + show=True, +) + +from cluster import Cluster +from manager_utils import batch_identification, subscribe_to_detections +from cluster_state import AutoState + + +app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates")) +app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER +app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH +app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED +app.config["SESSION_TYPE"] = "filesystem" + +auth = HTTPBasicAuth() +scheduler = APScheduler() +app_logger = LoggerFactory.get_logger('APP') +AUTO = AutoState(debug=False) + + +@scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900) +def cluster_state(): + AUTO.check_cluster_state() + # app_logger.debug("Finished with auto cluster state") + + + + + +def get_config() -> dict: + config = configparser.ConfigParser() + config.read(CONFIG) + auth_data = dict(config.items("API")) + return auth_data + + +USER = get_config()["user"] +PASSWORD = get_config()["password"] + +users = { + USER: generate_password_hash(PASSWORD), +} + + +@auth.verify_password +def verify_password(username, password): + if username in users and check_password_hash(users.get(username), password): + return username + + +def allowed_file(filename): + return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS + + +@app.route("/") +@auth.login_required +def home(): + return render_template("index.html") + + +@app.route("/faces") +def faces(): + return render_template("faces.html") + + +@app.route("/cluster", methods=["GET", "POST"]) +def cluster_activity(): + if request.method == "POST": + if request.form.get("get_state") == "Get state": + try: + cluster = Cluster(CONFIG, ABS_PATH) + command = ( + "ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml" + ) + lines = cluster.run_ansible_command(command, use_configs=True) + state = "".join(lines).replace("\\n", "\n") + flash(state, category="success") + except Exception as e: + flash(f"Get state failed with: {e}", category="error") + app_logger.error(f"Get state failed with: {e}") + return redirect(request.url) + + if request.method == "GET": + return render_template("cluster.html") + + +@app.route("/faces/batch", methods=["GET", "POST"]) +def batch(): + if request.method == "POST": + lookupIds = request.form.get("lookupIds") + faceVectorArchive = request.form.get("faceVectorArchive") + recognitionThreshold = request.form.get("recognitionThreshold") + try: + matches = batch_identification( + lookupIds, faceVectorArchive, recognitionThreshold + ) + flash( + f"Batch identification match {matches}", + category="success", + ) + except Exception as e: + flash(f"Batch identification failed with error: {e}", category="error") + app_logger.error(f"Batch identification failed with error: {e}") + return redirect(request.url) + if request.method == "GET": + return render_template("batch.html") + +# curl -v -u : http://localhost:5010/faces/api/v1.0/get_cluster_state +@app.route("/api/faces/v1.0/get-cluster-state", methods=["GET"]) +@auth.login_required +def get_cluster_activity(): + try: + cluster = Cluster(CONFIG, ABS_PATH) + command = "ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml" + state = cluster.run_ansible_command(command, use_configs=True) + time = datetime.datetime.now() + data = { + "message": state, + "code": "SUCCESS", + "insertTimestamp": time, + } + return make_response(jsonify(data), 200) + except Exception as e: + data = {"message": f"Get state failed with error: {e}", "code": "FAILED"} + app_logger.error(data) + return make_response(jsonify(data), 400) + + +# curl -v -u : -X POST -H "Content-type: application/json" -d @test_input.json http://localhost:5010/faces/api/v1.0/batch +@app.route("/api/faces/v1.0/batch", methods=["POST"]) +@auth.login_required +def batch_id(): + content_type = request.headers.get("Content-Type") + if content_type == "application/json": + if request.is_json: + data = request.json + lookupIds = data.get("lookupIds") + faceVectorArchive = data.get("faceVectorArchive") + recognitionThreshold = data.get("recognitionThreshold") + try: + matches = batch_identification( + lookupIds, faceVectorArchive, recognitionThreshold + ) + time = datetime.datetime.now() + data = { + "message": matches, + "code": "SUCCESS", + "insertTimestamp": time, + } + return make_response(jsonify(data), 200) + except Exception as e: + data = { + "message": f"Batch identification failed with error: {e}", + "code": "FAILED", + } + app_logger.error(data) + return make_response(jsonify(data), 400) + else: + data = {"message": f"Invalid json: {request}", "code": "FAILED"} + return make_response(jsonify(data), 400) + else: + data = {"message": f"Wrong content type: {content_type}", "code": "FAILED"} + return make_response(jsonify(data), 400) + +def run_app(): + port = int(os.environ.get("PORT", 5010)) + app_logger.info(ABS_PATH) + app.secret_key = get_config()["key"] + mode = get_config()["mode"] + scheduler.init_app(app) + scheduler.start() + if mode == "debug": + app.run(debug=True, host="0.0.0.0", port=port) + elif mode == "prod": + app.run(port=port) + + +if __name__ == "__main__": + run_app() diff --git a/master-node/src/cluster.py b/master-node/src/cluster.py new file mode 100644 index 0000000..c28f693 --- /dev/null +++ b/master-node/src/cluster.py @@ -0,0 +1,108 @@ +import configparser +import json +import pandas as pd +import paramiko +import os +from tabulate import tabulate + + +class Cluster: + def __init__(self, config_path: str, abs_path: str) -> None: + self.abs_path = abs_path + self.config = configparser.ConfigParser() + self.config.read(config_path) + self.auth_data = dict(self.config.items("CLUSTER")) + self.ssh = paramiko.SSHClient() + + def connect(self) -> None: + k = paramiko.RSAKey.from_private_key_file( + os.path.join(self.abs_path, self.auth_data["key"]) + ) + host = self.auth_data["host"] + port = self.auth_data["port"] + self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + # self.ssh.load_system_host_keys() + # self.ssh.get_host_keys().add(host, "ssh-rsa", k) + self.ssh.connect( + hostname=self.auth_data["host"], + username=self.auth_data["user"], + pkey=k, + port=self.auth_data["port"], + ) + + def get_state(self) -> list: + self.connect() + get_state = "docker node ls" + try: + ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(get_state) + except Exception as e: + print(ssh_stderr) + print(e) + return 1 + ssh_stdout.channel.set_combine_stderr(True) + output = ssh_stdout.readlines() + self.ssh.close() + return output + + def run_ansible_command(self, command: str, use_configs=False) -> list: + self.connect() + if use_configs: + command = "cd sinapse-configs/nodes/ansible && " + command + try: + ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(command) + ssh_stdout.channel.set_combine_stderr(True) + output = ssh_stdout.readlines() + except Exception as e: + print(ssh_stderr) + print(e) + output = ssh_stderr.readlines() + + self.ssh.close() + # formatted_output = "".join(output) + # return json.loads(formatted_output) + return output + + def format_state(self, as_json=False) -> str: + state = self.get_state() + keys = state[0].split() + f_state = [] + try: + for node in state[1:-1]: + buffer = {} + line = node.split() + for i, j in zip(line, keys): + buffer.update({j: i}) + f_state.append(buffer) + df = pd.DataFrame(f_state) + df = df.rename(columns={"MANAGER": "ENGINE"}) + df["MANAGER"] = "" + leader = state[-1].split() + df2 = pd.DataFrame( + { + "ID": leader[0], + "HOSTNAME": leader[2], + "STATUS": leader[3], + "AVAILABILITY": leader[4], + "ENGINE": leader[6], + "MANAGER": leader[5], + }, + index=[0], + ) + df = pd.concat([df, df2], ignore_index=True) + print(tabulate(df, headers="keys", tablefmt="psql")) + if not as_json: + return df.to_json(orient="records") + else: + # return tabulate(df, headers="keys", tablefmt="psql") + return df.to_dict("records") + except Exception as e: + print(e) + return "Format failed" + + +if __name__ == "__main__": + cluster = Cluster( + os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.ini"), + os.path.dirname(os.path.realpath(__file__)), + ) + print(cluster.format_state()) diff --git a/master-node/src/cluster_state.py b/master-node/src/cluster_state.py new file mode 100644 index 0000000..7876214 --- /dev/null +++ b/master-node/src/cluster_state.py @@ -0,0 +1,80 @@ +import json +import pprint +import os +import string +import time + +from cluster import Cluster +from manager_agent import CLUSTER_STATE, NODES + +ABS_PATH = os.path.dirname(os.path.realpath(__file__)) +CONFIG = os.path.join(ABS_PATH, "config.ini") +JSON_FILE = "cluster_state_running_only.json" + + +class AutoState: + def __init__(self, debug=False) -> None: + self.cluster = Cluster(CONFIG, ABS_PATH) + self.debug = debug + + def check_cluster_state(self, per_node=False, current_state=None) -> None: + start_time = time.time() + if current_state: + state = current_state + else: + state = CLUSTER_STATE + if per_node: + for node in state: + command = ( + f"ansible-playbook -i hosts/{node}.yml playbooks/check_module.yml" + ) + cluster_state = self.cluster.run_ansible_command( + command, use_configs=True + ) + start = cluster_state.index( + "TASK [../roles/check : Display running info] ***********************************\n" + ) + running = cluster_state[start + 3] + running = running.translate({ord(c): None for c in string.whitespace}) + running = running.replace('"', "") + state[node]["running"] = int(running) + print(node) + pprint.pprint(state[node]) + else: + command = "ansible-playbook -i hosts/nodes.yml playbooks/check_module.yml" + cluster_state = self.cluster.run_ansible_command(command, use_configs=True) + start = cluster_state.index( + "TASK [../roles/check : Display running info] ***********************************\n" + ) + i, j = 1, 3 + for x in range(0, NODES): + node = cluster_state[start + i] + node = node[node.find("[") + 1 : node.find("]")] + if node not in state: + continue + running = cluster_state[start + j] + running = running.translate({ord(c): None for c in string.whitespace}) + running = running.replace('"', "") + state[node]["running"] = int(running) + i += 5 + j += 5 + # pprint.pprint( + # ">>> Got cluster state by %s seconds" % (time.time() - start_time) + # ) + start_time = time.time() + json_state = json.dumps(state, indent=4) + with open(os.path.join(ABS_PATH, JSON_FILE), "w") as outfile: + outfile.write(json_state) + # pprint.pprint( + # ">>> Saved cluster state by %s seconds" % (time.time() - start_time), + # ) + # print(f">>> Saved cluster state to {JSON_FILE}") + if self.debug: + pprint.pprint(CLUSTER_STATE) + if current_state: + return state + + +if __name__ == "__main__": + auto = AutoState() + auto.check_cluster_state() diff --git a/master-node/src/docker-gunicorn.conf.py b/master-node/src/docker-gunicorn.conf.py new file mode 100644 index 0000000..5793aca --- /dev/null +++ b/master-node/src/docker-gunicorn.conf.py @@ -0,0 +1,2 @@ +accesslog = "/logs/access.log" +errorlog = "/logs/error.log" diff --git a/master-node/src/load_balancer.py b/master-node/src/load_balancer.py new file mode 100644 index 0000000..f31ebb8 --- /dev/null +++ b/master-node/src/load_balancer.py @@ -0,0 +1,547 @@ +import argparse +import errno +import os +import select +import socket +import sys + +""" +Есть горячие и холодные ноды. +При старте работы на 6 горячих нод уже запущены 6 контейнеров по 1 на каждой. На холодных установлен только образ, запущенных нет. +Далее берём максимум в 12 контейнеров на ноде. +При превышении запросами текущего количества работающих контейнеров стартуют контейнеры на горячих нодах до 6 контейнеров на каждой ноде (36). +Далее при увеличении нагрузки включаются холодные ноды, и также до 6 контейнеров на ноде (30). +Далее по аналогии увеличение контейнеров на горячих до 12 на ноде (72), и для холодных (60). +При постоянном потоке запросов превышающим кол-во работающих контейнеров (132) отбрасываем сообщения, что кластер забит. +Если нагрузка спадает, то на горячих нодах контейнеры работают ещё в течение 10 минут, далее включаются до 1 на ноде. На холодных - сразу всё отключается до 0. + +Реализовано это всё через функции постоянного опроса стейта кластера, сохранения стейта и счётчиков работающих контейнеров с меткой модуля +""" +""" +СТАТИЧЕСКАЯ БАЛАНСИРОВКА +11 нод в кластере: 6 горячих, 5 холодных. +Оценить влияние кол-ва лиц на предельно допустимые значения нагрузки. +МАКС КОНТЕЙНЕРОВ = 176 TODO - const value - find max values - ? +МАКС ПО ТИПАМ (1-16, 2-14, 3-12, 4-10) TODO - const value - find max values - ? +Добавить черный список контейнеров - ? + VS +Фильтрация параметров - ? + +0. 6 контейнеров на горячих нодах 1:1, 0 контейнеров на холодных нодах +1. Приемка запроса на подписку от грады +2. Определение типа детектора (4 типа: 1-2-3-4) +3. Определение текущей загрузки горячих нод (прямое исполнение команды на мастере) +4. Если не достигли половины предела (6 контейнеров * 6 нод = 36), то используем текущие, +запускаем новые с другим типом детектора (аргумент через ENV) +5. По достижению пол-предела в 36 контейнеров начинается загрузка холодных нод по аналогии с п. 4 (5 * 6 = 30) +6. По достижению пол-предела в 36 + 30 = 66 контейнеров - старт загрузки горячих нод по пункту 4 +7. По аналогии с холодными нодами до 132 (при 1 типе), убывание максимума обратно пропорционально +кол-во запущенных модулей с мощным детектором +""" + + +class LoadBalancer: + sendbuffer = 4096 + maxbuffersize = 2**16 + maxservers = 132 + + def __init__( + self, + host="0.0.0.0", + port=80, + targets=[], + fallback=None, + check_time=None, + debug=False, + family=socket.AF_INET, + typ=socket.SOCK_STREAM, + ): + assert isinstance(host, str), ValueError("Expected string as host-argument") + assert isinstance(port, int), ValueError("Expected int as port-argument") + assert isinstance(targets, list), ValueError( + "Expected list as targets-arguments" + ) + assert isinstance(fallback, tuple) or (fallback is None), ValueError( + "Expected tuple or None as fallback-argument" + ) + assert ( + isinstance(check_time, int) + or isinstance(check_time, float) + or (check_time is None) + ), ValueError("Expected int,long or float as check_time-argument") + assert check_time >= 0 or (check_time is None), ValueError( + "Expected positive integer as check_time-argument" + ) + assert isinstance(debug, bool), ValueError("Expected bool as debug-argument") + for addr in targets: + assert len(addr) == 2, ValueError( + "Each address in targets needs to be a tuple of length 2" + ) + assert isinstance(addr[0], str), ValueError( + "Each address in targets needs to have a str at index 0" + ) + assert isinstance(addr[1], int), ValueError( + "Each address in targets needs to have a int at index 1" + ) + assert addr[1] > 0, ValueError( + "Each address in targets needs to have a integer larger 0 as port" + ) + self.host = host + self.port = port + self.targets = targets + self.fallback = fallback + self.check_time = check_time + self.debug = debug + self.family = family + self.type = typ + self.o2i = {} + self.i2o = {} + self.s2b = {} + self.o2t = {} + self.t2n = {} + self.running = False + self.bind_and_listen() + + def bind_and_listen(self): + if self.debug: + print("binding...") + self.listen_s = socket.socket(self.family, self.type, 0) + self.listen_s.bind((self.host, self.port)) + if self.debug: + print("bound to: {host}:{port}".format(host=self.host, port=self.port)) + try: + self.listen_s.listen(3) + except: + self.listen_s.listen(1) + + def mainloop(self): + if self.debug: + print("entering mainloop.") + self.running = True + try: + while self.running: + checkread = checkex = ( + [self.listen_s] + self.i2o.keys() + self.o2i.keys() + ) + checkwrite = filter( + None, [s for s in self.s2b.keys() if len(self.s2b[s]) > 0] + ) + toread, towrite, exceptional = select.select( + checkread, checkwrite, checkex, self.check_time + ) + if self.debug: + print("data avaible on: ", toread) + print("buffer free on: ", towrite) + print("errors on: ", exceptional) + if len(toread) == 0 and len(towrite) == 0 and len(exceptional) == 0: + continue + for s in exceptional: + if s is self.listen_s: + if self.debug: + print("error in listening socket!") + self.running = False + raise RuntimeError("Error in listening socket!") + elif s in self.o2i: + if self.debug: + print("error in client connection: closing socket.") + self.close_s(s) + else: + s2c = s + c2s = self.i2o[s2c] + if c2s in self.o2t: + old_peer = self.o2t[c2s] + if old_peer == self.fallback or (self.fallback is None): + old_peer = None + else: + old_peer = None + if old_peer is None: + if self.debug: + print("fallback not possible, closing s") + self.close_s(c2s) + else: + if self.debug: + print( + "error in connection to normal server, instead connecting to fallback-server." + "" + ) + if self.debug: + print("unregistering old peer") + try: + s2c.shutdown(socket.SHUT_RDWR) + except: + pass + try: + s2c.close() + except: + pass + if s2c in self.s2b: + old_buff = self.s2b[s2c] + if self.debug: + print( + "redirecting {n} bytes to fallback server".format( + n=len(old_buff) + ) + ) + del self.s2b[s2c] + else: + old_buff = "" + if s2c in self.i2o: + del self.i2o[s2c] + if c2s in self.o2t: + t = self.o2t[c2s] + if t in self.t2n: + self.t2n[t] = max(0, self.t2n[t] - 1) + self.o2t[c2s] = self.fallback + peer = socket.socket(self.family, self.type, 0) + peer.setblocking(0) + self.s2b[peer] = old_buff + self.o2i[c2s] = peer + self.i2o[peer] = c2s + self.o2t[c2s] = self.fallback + try: + err = peer.connect_ex(self.fallback) + except Exception as e: + try: + err = e.args[0] + except: + err = None + if ( + err == errno.EWOULDBLOCK + or err == errno.WSAEWOULDBLOCK + ): + pass + else: + if self.debug: + print("error during connect to fallback:", e) + self.close_s(c2s) + continue + if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK: + pass + else: + if self.debug: + print( + "error during connet to fallback:", + errno.errorcode[err], + ) + self.close_s(c2s) + for s in towrite: + if s not in self.s2b: + continue + if len(self.s2b[s]) < self.sendbuffer: + tosend = self.s2b[s] + else: + tosend = self.s2b[s][: self.sendbuffer] + if self.debug: + print( + "sending {n} bytes (left: {t} bytes)".format( + n=len(tosend), t=len(self.s2b[s]) - len(tosend) + ) + ) + try: + sent = s.send(tosend) + except socket.error as e: + if self.debug: + print("error writing buffer:", e) + self.close_s(s) + continue + if self.debug: + print("sent {n} bytes.".format(n=sent)) + if sent >= len(self.s2b[s]): + self.s2b[s] = "" + self.s2b[s] = self.s2b[s][sent:] + for s in toread: + if s is self.listen_s: + if self.debug: + print("got request") + if len(self.targets) == 0: + if self.debug: + print("using fallback server") + target = self.fallback + else: + c = 9999999 + target = None + for ta in targets: + if ta not in self.t2n: + self.t2n[ta] = 0 + n = self.t2n[ta] + if n < c: + c = n + target = ta + if target is None: + if self.debug: + print("cannot find a target!") + continue + if target in self.t2n: + self.t2n[target] += 1 + else: + self.t2n[target] = 1 + if self.debug: + print("target is:", target) + new_s, addr = self.listen_s.accept() + new_s.setblocking(0) + peer = socket.socket(self.family, self.type, 0) + peer.setblocking(0) + self.s2b[new_s] = "" + self.s2b[peer] = "" + self.o2i[new_s] = peer + self.i2o[peer] = new_s + self.o2t[new_s] = target + try: + err = peer.connect_ex(target) + except Exception as e: + try: + err = e.args[0] + except: + err = None + if err == errno.EWOULDBLOCK or err == errno.WSAEWOULDBLOCK: + pass + else: + if self.debug: + print("error during connect to target:", e) + self.close_s(new_s) + continue + if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK: + pass + else: + if self.debug: + print( + "error during connet to target:", + errno.errorcode[err], + ) + self.close_s(new_s) + else: + if s in self.i2o: + p = self.i2o[s] + elif s in self.o2i: + p = self.o2i[s] + else: + if self.debug: + print("a socket has no peer registered, closing it.") + self.close_s(s) + continue + try: + peerbufferlength = len(self.s2b[p]) + except KeyError: + peerbufferlength = 0 + self.s2b[p] = "" + if peerbufferlength < self.maxbuffersize: + maxread = self.maxbuffersize - peerbufferlength + try: + data = s.recv(maxread) + except socket.error as e: + err = e.args[0] + if err == errno.EAGAIN or err == errno.EWOULDBLOCK: + continue + else: + if self.debug: + print("error while receiving:", e) + self.close_s(s) + continue + if self.debug: + print("reveived {n} bytes.".format(n=len(data))) + if len(data) == 0: + if self.debug: + print("connection closed.") + self.close_s(s) + else: + self.s2b[p] += data + else: + continue + finally: + if self.debug: + print("closing all connections...") + try: + self.listen_s.close() + except: + pass + for s in self.o2i.keys() + self.i2o.keys(): + self.close_s(s) + + def close_s(self, s): + try: + s.shutdown(socket.SHUT_RDWR) + except: + pass + try: + s.close() + except: + pass + if s in self.s2b: + del self.s2b[s] + if s in self.o2t: + t = self.o2t[s] + del self.o2t[s] + if t in self.t2n: + self.t2n[t] = max(self.t2n[t] - 1, 0) + if s in self.o2i: + p = self.o2i[s] + del self.o2i[s] + self.close_s(p) + if s in self.i2o: + p = self.i2o[s] + del self.i2o[s] + self.close_s(p) + + def add_target(self, addr): + assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument") + assert len(addr) == 2, ValueError( + "Expected a tuple of length 2 as addr-argument" + ) + assert isinstance(addr[0], str), ValueError( + "Expected addr-argument to have a string at index 0" + ) + assert isinstance(addr[1], str), ValueError( + "Expected addr-argument to have a integer at index 1" + ) + if addr in self.targets: + raise ValueError("Target already registered!") + else: + self.targets.append(addr) + + def remove_target(self, addr): + assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument") + assert len(addr) == 2, ValueError( + "Expected a tuple of length 2 as addr-argument" + ) + assert isinstance(addr[0], str), ValueError( + "Expected addr-argument to have a string at index 0" + ) + assert isinstance(addr[1], str), ValueError( + "Expected addr-argument to have a integer at index 1" + ) + if addr in self.targets: + self.targets.remove(addr) + else: + raise ValueError("Target not found!") + + def stop(self): + if not self.running: + raise RuntimeError("Server not running!") + self.running = False + self.check_time = 0 + + +def load_from_file(path, ignore_errors=False): + targets = [] + with open(path, "rU") as f: + for line in f: + try: + if line.startswith("#"): + continue + if line.count(":") != 1: + raise SyntaxError( + "Error in line '{l}': expected exactly one ':'!".format(l=line) + ) + host, port = line.split(":") + try: + port = int(port) + except ValueError: + raise SyntaxError( + "Error in line '{l}': cannot convert port to int!".format( + l=line + ) + ) + if len(host) == 0: + raise SyntaxError( + "Error in line '{l}': invalid host format!".format(l=line) + ) + targets.append((host, port)) + except SyntaxError: + if not ignore_errors: + raise + return targets + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A Load-Balancer") + parser.add_argument("host", help="host/ip to bind to") + parser.add_argument("port", type=int, help="port to bind to") + parser.add_argument( + "-d", action="store_true", dest="debug", help="enable debug-mode" + ) + parser.add_argument( + "-p", + action="store", + dest="path", + help="load target list from file", + required=False, + default=None, + ) + parser.add_argument( + "-f", + action="store", + dest="fallback", + help="target to relay connections to if no server are aviable", + required=False, + default=None, + ) + parser.add_argument( + "-t", + action="store", + dest="targets", + help="targets to spread connections to", + required=False, + default=None, + nargs="+", + ) + ns = parser.parse_args() + if ns.targets is None: + targets = [] + else: + targets = [] + for ta in ns.targets: + if ta.count(":") != 1: + print( + "SyntaxError in command-line target-list: expected exactly one ':'!" + ) + sys.exit(1) + host, port = ta.split(":") + if len(host) == 0: + print("SyntaxError in command-line target-list: invalid host!") + sys.exit(1) + try: + port = int(port) + if port <= 0: + raise ValueError + except ValueError: + print("SyntaxError in command-line target-list: invalid port!") + sys.exit(1) + targets.append((host, port)) + if ns.fallback is not None: + if ns.fallback.count(":") != 1: + print("SyntaxError in fallback-argument: expected exactly one ':'!") + sys.exit(1) + host, port = ns.fallback.split(":") + if len(host) == 0: + print("SyntaxError in fallback-argument: invalid host!") + sys.exit(1) + try: + port = int(port) + if port <= 0: + raise ValueError + except ValueError: + print("SyntaxError in fallback-argument: invalid port!") + sys.exit(1) + fallback = (host, port) + else: + fallback = None + if ns.path is None: + pass + elif not os.path.isfile(ns.path): + print("Error: File not found!") + sys.exit(1) + else: + targets += load_from_file(ns.path, False) + if len(targets) == 0: + print("Error: no targets found!") + sys.exit(1) + lb = LoadBalancer(ns.host, ns.port, targets, fallback, None, ns.debug) + try: + lb.mainloop() + except (KeyboardInterrupt, SystemExit) as e: + pass + finally: + try: + lb.stop() + except RuntimeError as e: + pass diff --git a/master-node/src/logger.py b/master-node/src/logger.py new file mode 100644 index 0000000..5c67858 --- /dev/null +++ b/master-node/src/logger.py @@ -0,0 +1,77 @@ +import logging +import os +import sys +from typing import Union + +LOG_FILE = os.path.join(os.getcwd(), "logfile.log") + + +class LoggerFactory: + """ + Class for logging behaviour of data exporting - object of ExportingTool class + """ + log_level = logging.INFO + formatter = logging.Formatter("%(asctime)s — %(name)s — %(levelname)s — %(message)s") + show = True + + @classmethod + def setting(cls, + log_level: Union[str, int] = logging.INFO, + log_format: str = "%(asctime)s — %(name)s — %(levelname)s — %(message)s", + show: bool = True) -> None: + """ + Re-defined __init__ method which sets show parametr + + Args: + log_level (str, int): logging level as string or logging. (like logging.INFO) + log_format: (str): logging format of saved massage + show (bool): if set all logs will be shown in terminal + """ + if isinstance(log_level, str): + log_level: int = logging.getLevelName(log_level) + cls.show = show + cls.log_level = log_level + cls.formatter = logging.Formatter(log_format) + + @classmethod + def get_console_handler(cls) -> logging.StreamHandler: + """ + Class method the aim of which is getting a console handler to show logs on terminal + + Returns: + logging.StreamHandler: handler object for streaming output through terminal + """ + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(cls.formatter) + return console_handler + + @classmethod + def get_file_handler(cls) -> logging.FileHandler: + """ + Class method the aim of which is getting a file handler to write logs in file LOG_FILE + + Returns: + logging.FileHandler: handler object for streaming output through std::filestream + """ + file_handler = logging.FileHandler(LOG_FILE, mode="w") + file_handler.setFormatter(cls.formatter) + return file_handler + + @classmethod + def get_logger(cls, logger_name: str): + """ + Class method which creates logger with certain name + + Args: + logger_name (str): name for logger + + Returns: + logger: object of Logger class + """ + logger = logging.getLogger(logger_name) + logger.setLevel(cls.log_level) + if cls.show: + logger.addHandler(cls.get_console_handler()) + logger.addHandler(cls.get_file_handler()) + logger.propagate = False + return logger diff --git a/master-node/src/manager_agent.py b/master-node/src/manager_agent.py new file mode 100644 index 0000000..89429b6 --- /dev/null +++ b/master-node/src/manager_agent.py @@ -0,0 +1,228 @@ +import json +import os +import pprint +import random +import string +from typing import Union, Tuple, Any + +from module_size_calculator import get_face_recognition_module_resource_usage +from cluster import Cluster +from utils.patterns import Singleton + +MAX_SYSTEM_MEMORY = 3000 # Mb +MAX_DEVICE_MEMORY = 7500 # Mb +MAX_SCORE = 110 # % + +# MIN_PORT = 1000 +# MAX_PORT = 2000 +USED_PORTS = set() + +NODES = 11 + +ABS_PATH = os.path.dirname(os.path.realpath(__file__)) +CONFIG = os.path.join(ABS_PATH, "config.ini") +USAGE_CONFIG = os.path.join(ABS_PATH, "docker_module_resource_usage.yaml") +RUNNING_STATE = "cluster_state_running_only.json" +FULL_STATE = "cluster_state_full.json" +SOFT_STATE = {"image": "", "detector_size": "", "max_faces": "", "min_fps": ""} + +HARD_STATE = { + "SYSTEM_MEMORY": 0, + "DEVICE_MEMORY": 0, + "CONTAINER_CAPACITY": 0, + "CPU_USAGE": None, + "DEVICE_USAGE": None, +} + +CLUSTER_STATE = { + # "1-node": { + # "soft_state": [], + # "hard_state": HARD_STATE, + # "running": 0, + # "min_port": 1024, + # "max_port": 1224, + # }, + "2-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 2024, + "max_port": 2224, + }, + "3-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 3024, + "max_port": 3224, + }, + "4-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 4024, + "max_port": 4224, + }, + "5-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 5024, + "max_port": 5224, + }, + "6-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 6024, + "max_port": 6224, + }, + "7-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 7024, + "max_port": 7224, + }, + "8-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 8024, + "max_port": 8224, + }, + "9-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 9024, + "max_port": 9224, + }, + "10-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 10024, + "max_port": 10224, + }, + "11-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 11024, + "max_port": 11224, + }, + "12-node": { + "soft_state": [], + "hard_state": HARD_STATE, + "running": 0, + "min_port": 12024, + "max_port": 12224, + }, +} + + +class Agent(Singleton): + def __init__(self, debug=False) -> None: + self.cluster = Cluster(CONFIG, ABS_PATH) + self.debug = debug + + def read_cluster_state(self) -> None: + try: + with open(os.path.join(ABS_PATH, RUNNING_STATE), "r") as f: + running_state = json.load(f) + except FileNotFoundError as e: + raise FileNotFoundError(f"Cluster state is not available by {RUNNING_STATE}") + for node in running_state: + if node not in CLUSTER_STATE: + continue + CLUSTER_STATE[node]["running"] = running_state[node]["running"] + + def sort_state(self) -> list: + sorted_state_list = sorted(CLUSTER_STATE.items(), key=lambda x: x[1]["running"]) + if self.debug: + print("Sorted cluster state") + pprint.pprint(sorted_state_list) + return sorted_state_list + + def get_node_port( + self, + input_image_number, + image_width, + image_height, + max_faces, + min_fps, + detector_size, + image, + ) -> Union[None, Tuple[Any, int]]: + hard_state = get_face_recognition_module_resource_usage( + USAGE_CONFIG, + input_image_number=input_image_number, + image_width=image_width, + image_height=image_height, + max_faces=max_faces, + min_fps=min_fps, + detector_size=detector_size, + ) + self.read_cluster_state() + sorted_state = self.sort_state() + for node in sorted_state: + future_sys_mem = ( + CLUSTER_STATE[node[0]]["hard_state"]["SYSTEM_MEMORY"] + + hard_state["SYSTEM_MEMORY"] + ) + future_dev_mem = ( + CLUSTER_STATE[node[0]]["hard_state"]["DEVICE_MEMORY"] + + hard_state["DEVICE_MEMORY"] + ) + future_node_cap = ( + CLUSTER_STATE[node[0]]["hard_state"]["CONTAINER_CAPACITY"] + + hard_state["CONTAINER_CAPACITY"] + ) + if ( + future_sys_mem < MAX_SYSTEM_MEMORY + and future_dev_mem < MAX_DEVICE_MEMORY + and future_node_cap < MAX_SCORE + ): + CLUSTER_STATE[node[0]]["soft_state"].append( + { + "image": image, + "detector_size": detector_size, + "max_faces": max_faces, + "min_fps": min_fps, + } + ) + CLUSTER_STATE[node[0]]["hard_state"] = hard_state + port = random.randint( + CLUSTER_STATE[node[0]]["min_port"], + CLUSTER_STATE[node[0]]["max_port"], + ) + while port in USED_PORTS: + port = random.randint( + CLUSTER_STATE[node[0]]["min_port"], + CLUSTER_STATE[node[0]]["max_port"], + ) + USED_PORTS.add(port) + if self.debug: + print("Future cluster state") + pprint.pprint(CLUSTER_STATE) + json_state = json.dumps(CLUSTER_STATE, indent=4) + with open(os.path.join(ABS_PATH, FULL_STATE), "w") as outfile: + outfile.write(json_state) + return node[0], port + else: + continue + + +if __name__ == "__main__": + agent = Agent() + node, port = agent.get_node_port( + input_image_number=8, + image_width=2560, + image_height=1440, + max_faces=5, + min_fps=12, + detector_size=640, + image="test", + ) + print(node, port) diff --git a/master-node/src/manager_utils.py b/master-node/src/manager_utils.py new file mode 100644 index 0000000..c5eb31e --- /dev/null +++ b/master-node/src/manager_utils.py @@ -0,0 +1,157 @@ +import asyncio +import configparser +import logging +from typing import List, Dict, Any, Optional, Tuple, Union + +# ============================= Batch identification ========================== + +import numpy as np +import copy + +from detectors_stream_worker import DetectorsStreamWorker, DetectorsStreamWorkerManager +from utils.base64_convector import base64_2_vector +from face_identification import vector_identification +import os + +CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.ini") +CONFIG = configparser.ConfigParser() +CONFIG.read(CONFIG_PATH) +CONF_DATA = dict(CONFIG.items("MANAGER")) + +from db_tools import DataBaseManager + +app_logger = logging.getLogger('APP') + +def search_vectors_in_db( + lookupIds: List[str], + target, # id-шники требуемых векторов +) -> Union[str, bytes, List[List[float]]]: + manager = DataBaseManager(CONFIG_PATH) + lookupIds_vectors = manager.get_vectors(lookupIds, target=target) + return lookupIds_vectors + + +def batch_identification( + lookupIds: List[str], + faceVectorArchive: List[Dict[str, Any]], + recognitionThreshold: float, +): + """ + Функция поиска лиц из базы розыска в faceVectorArchive. + + Args: + lookupVectors (List[str]): список id-шников векторов лиц из базы поиска людей, + с которыми необходимо сравнивать незнакомые лица. Формат: + [ + “de630dfa-158a-4d20-a903-ded922132124”, + ... + ] + faceVectorArchive (List[Dict[str, Any]]): вектора лиц с видеокамер, которые необходимо сравнить + с людьми из базы поиска. Формат: + [ + { + vectorId: “de630dfa-158a-4d20-a903-ded922132124”, + faceVector: base64-строка (закодированные 512 флотов) + } + ... + ] + recognitionThreshold (float): пороговое значение сходства, меньше которого лица будут считаться различными. + + Return: + { + matches: [ + { + faceVectorId: “de630dfa-158a-4d20-a903-ded922132124”, - id вектора из архива, + на котором нашлось совпадение + vectorId: “de630dfa-158a-4d20-a903-ded922132124”, - id вектора из базы розыска + recognitionConfidence: float - уровень уверенности (должно быть больше чем threshold, + чтобы детекция попала в совпадения) + }, + ... + ] + } + """ + # Ищем вектора в базе розыска по lookupIds + lookupIds_vectors: Union[str, bytes, List[List[float]]] = search_vectors_in_db( + lookupIds, target="base_id" + ) + # Формируем структуру базы розыска + lookupVectors: Dict[str, List[float]] = [ + {"vectorId": name, "faceVector": vector} + for name, vector in zip(lookupIds, lookupIds_vectors) + ] + + # Раскодируем вектора из баз розыска + # Пройдёмся по всем базам розыска + faceVectorArchive = copy.deepcopy(faceVectorArchive) + for search_base in [lookupVectors, faceVectorArchive]: + # Пройдёмся по каждой записи + for row in search_base: + # Если вектор - строка или байты - раскодируем + vector = row.get("faceVector") + if isinstance(vector, (str, bytes)): + row["faceVector"] = base64_2_vector(vector, dtype=np.float32).tolist() + + # Вызываем метод поиска + identification_result = vector_identification( + lookupVectors=lookupVectors, + faceVectorArchive=faceVectorArchive, + recognitionThreshold=recognitionThreshold, + ) + + # Сформируем результат + result = [ + { + "vectorId": row["vectorId"], + "faceVectorId": row["detectionId"], + "recognitionConfidence": row["recognitionConfidence"], + } + for row in identification_result + ] + + return {"matches": result} + + +def subscribe_to_detections( + stream: str, + detectionFrequency: int, + recognitionThreshold: float, + searchBaseIds: List[str], + bboxStream: Optional[str] = None, + eventsStream: Optional[str] = None, + bboxThreshold: Optional[float] = None, + detectionThreshold: Optional[float] = None, + controlArea: Optional[List[Tuple[int, int]]] = None, + minFaceSize: Optional[float] = None, + maxFacesPerFrame: Optional[int] = None, + enableAttributes: bool = False +): + worker = DetectorsStreamWorker( + stream=stream, + detectionFrequency=detectionFrequency, + recognitionThreshold=recognitionThreshold, + searchBaseIds=searchBaseIds, + bboxStream=bboxStream, + eventsStream=eventsStream, + bboxThreshold=bboxThreshold, + detectionThreshold=detectionThreshold, + controlArea=controlArea, + minFaceSize=minFaceSize, + maxFacesPerFrame=maxFacesPerFrame, + enableAttributes=enableAttributes, + ) + # Получим асинхронный цикл + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + app_logger.debug('worker create') + try: + loop.run_until_complete(worker.create()) + except Exception as e: + app_logger.debug(f'worker close with error: {e}') + loop.run_until_complete(worker.close()) + raise e + else: + app_logger.debug('worker run') + DetectorsStreamWorkerManager.add(worker)