added master node logic
This commit is contained in:
parent
e1192217f2
commit
834ded2811
12 changed files with 1455 additions and 0 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
simar-key
|
||||||
|
config.ini
|
||||||
8
master-node/Dockerfile
Normal file
8
master-node/Dockerfile
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
FROM python:3.12-slim-bookworm:latest
|
||||||
|
COPY ./docker_requirements.txt /requirements.txt
|
||||||
|
RUN pip install -r /requirements.txt
|
||||||
|
COPY . /app
|
||||||
|
WORKDIR /app
|
||||||
|
ENTRYPOINT ["python"]
|
||||||
|
CMD ["src/app.py"]
|
||||||
|
# ENTRYPOINT ["sh", "src/entrypoint.sh"] # WSGI - gunicorn prod run
|
||||||
15
master-node/docker_requirements.txt
Normal file
15
master-node/docker_requirements.txt
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
gunicorn == 21.2.0
|
||||||
|
requests == 2.26.0
|
||||||
|
httpx == 0.27.0
|
||||||
|
requests-oauthlib == 1.3.0
|
||||||
|
Flask == 3.0.2
|
||||||
|
Flask-HTTPAuth == 4.8.0
|
||||||
|
Flask-APScheduler == 1.13.1
|
||||||
|
pandas == 1.4.2
|
||||||
|
Werkzeug == 3.0.1
|
||||||
|
paramiko == 3.4.0
|
||||||
|
tabulate == 0.9.0
|
||||||
|
psycopg2-binary == 2.9.9
|
||||||
|
pydantic==2.6.3
|
||||||
|
PyYAML == 6.0
|
||||||
|
websockets==12.0
|
||||||
17
master-node/requirements.txt
Normal file
17
master-node/requirements.txt
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
gunicorn == 21.2.0
|
||||||
|
requests == 2.26.0
|
||||||
|
httpx == 0.27.0
|
||||||
|
requests-oauthlib == 1.3.0
|
||||||
|
Flask == 3.0.2
|
||||||
|
Flask-HTTPAuth == 4.8.0
|
||||||
|
Flask-APScheduler == 1.13.1
|
||||||
|
numpy == 1.26.2
|
||||||
|
pandas == 1.4.2
|
||||||
|
Werkzeug == 3.0.1
|
||||||
|
opencv-python-headless == 4.9.0.80
|
||||||
|
paramiko == 3.4.0
|
||||||
|
tabulate == 0.9.0
|
||||||
|
psycopg2 == 2.9.9
|
||||||
|
pydantic==2.6.3
|
||||||
|
PyYAML == 6.0
|
||||||
|
websockets==12.0
|
||||||
214
master-node/src/app.py
Normal file
214
master-node/src/app.py
Normal file
|
|
@ -0,0 +1,214 @@
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import configparser
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
|
|
||||||
|
from flask import (
|
||||||
|
Flask,
|
||||||
|
flash,
|
||||||
|
request,
|
||||||
|
redirect,
|
||||||
|
url_for,
|
||||||
|
render_template,
|
||||||
|
jsonify,
|
||||||
|
make_response,
|
||||||
|
)
|
||||||
|
from flask_httpauth import HTTPBasicAuth
|
||||||
|
from flask_apscheduler import APScheduler
|
||||||
|
from pydantic import ValidationError
|
||||||
|
from werkzeug.utils import secure_filename
|
||||||
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||||||
|
|
||||||
|
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
UPLOAD_FOLDER = os.path.join(ABS_PATH, "tmp")
|
||||||
|
ALLOWED_EXTENSIONS = {"csv", "mp4"}
|
||||||
|
MAX_CONTENT_PATH = 1000000000
|
||||||
|
CONFIG = os.path.join(ABS_PATH, "config.ini")
|
||||||
|
SCHEDULER_API_ENABLED = True
|
||||||
|
SHOW_LOG = True
|
||||||
|
|
||||||
|
from logger import LoggerFactory
|
||||||
|
# Настроим фабрику логгов
|
||||||
|
LoggerFactory.setting(
|
||||||
|
log_level=os.getenv("LOG_LEVEL", "INFO"),
|
||||||
|
log_format='[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s',
|
||||||
|
show=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
from cluster import Cluster
|
||||||
|
from manager_utils import batch_identification, subscribe_to_detections
|
||||||
|
from cluster_state import AutoState
|
||||||
|
|
||||||
|
|
||||||
|
app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates"))
|
||||||
|
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
|
||||||
|
app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH
|
||||||
|
app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED
|
||||||
|
app.config["SESSION_TYPE"] = "filesystem"
|
||||||
|
|
||||||
|
auth = HTTPBasicAuth()
|
||||||
|
scheduler = APScheduler()
|
||||||
|
app_logger = LoggerFactory.get_logger('APP')
|
||||||
|
AUTO = AutoState(debug=False)
|
||||||
|
|
||||||
|
|
||||||
|
@scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900)
|
||||||
|
def cluster_state():
|
||||||
|
AUTO.check_cluster_state()
|
||||||
|
# app_logger.debug("Finished with auto cluster state")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_config() -> dict:
|
||||||
|
config = configparser.ConfigParser()
|
||||||
|
config.read(CONFIG)
|
||||||
|
auth_data = dict(config.items("API"))
|
||||||
|
return auth_data
|
||||||
|
|
||||||
|
|
||||||
|
USER = get_config()["user"]
|
||||||
|
PASSWORD = get_config()["password"]
|
||||||
|
|
||||||
|
users = {
|
||||||
|
USER: generate_password_hash(PASSWORD),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@auth.verify_password
|
||||||
|
def verify_password(username, password):
|
||||||
|
if username in users and check_password_hash(users.get(username), password):
|
||||||
|
return username
|
||||||
|
|
||||||
|
|
||||||
|
def allowed_file(filename):
|
||||||
|
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/")
|
||||||
|
@auth.login_required
|
||||||
|
def home():
|
||||||
|
return render_template("index.html")
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/faces")
|
||||||
|
def faces():
|
||||||
|
return render_template("faces.html")
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/cluster", methods=["GET", "POST"])
|
||||||
|
def cluster_activity():
|
||||||
|
if request.method == "POST":
|
||||||
|
if request.form.get("get_state") == "Get state":
|
||||||
|
try:
|
||||||
|
cluster = Cluster(CONFIG, ABS_PATH)
|
||||||
|
command = (
|
||||||
|
"ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml"
|
||||||
|
)
|
||||||
|
lines = cluster.run_ansible_command(command, use_configs=True)
|
||||||
|
state = "".join(lines).replace("\\n", "\n")
|
||||||
|
flash(state, category="success")
|
||||||
|
except Exception as e:
|
||||||
|
flash(f"Get state failed with: {e}", category="error")
|
||||||
|
app_logger.error(f"Get state failed with: {e}")
|
||||||
|
return redirect(request.url)
|
||||||
|
|
||||||
|
if request.method == "GET":
|
||||||
|
return render_template("cluster.html")
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/faces/batch", methods=["GET", "POST"])
|
||||||
|
def batch():
|
||||||
|
if request.method == "POST":
|
||||||
|
lookupIds = request.form.get("lookupIds")
|
||||||
|
faceVectorArchive = request.form.get("faceVectorArchive")
|
||||||
|
recognitionThreshold = request.form.get("recognitionThreshold")
|
||||||
|
try:
|
||||||
|
matches = batch_identification(
|
||||||
|
lookupIds, faceVectorArchive, recognitionThreshold
|
||||||
|
)
|
||||||
|
flash(
|
||||||
|
f"Batch identification match {matches}",
|
||||||
|
category="success",
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
flash(f"Batch identification failed with error: {e}", category="error")
|
||||||
|
app_logger.error(f"Batch identification failed with error: {e}")
|
||||||
|
return redirect(request.url)
|
||||||
|
if request.method == "GET":
|
||||||
|
return render_template("batch.html")
|
||||||
|
|
||||||
|
# curl -v -u <user>:<pass> http://localhost:5010/faces/api/v1.0/get_cluster_state
|
||||||
|
@app.route("/api/faces/v1.0/get-cluster-state", methods=["GET"])
|
||||||
|
@auth.login_required
|
||||||
|
def get_cluster_activity():
|
||||||
|
try:
|
||||||
|
cluster = Cluster(CONFIG, ABS_PATH)
|
||||||
|
command = "ansible-playbook -i hosts/nodes.yml playbooks/ping_slaves.yml"
|
||||||
|
state = cluster.run_ansible_command(command, use_configs=True)
|
||||||
|
time = datetime.datetime.now()
|
||||||
|
data = {
|
||||||
|
"message": state,
|
||||||
|
"code": "SUCCESS",
|
||||||
|
"insertTimestamp": time,
|
||||||
|
}
|
||||||
|
return make_response(jsonify(data), 200)
|
||||||
|
except Exception as e:
|
||||||
|
data = {"message": f"Get state failed with error: {e}", "code": "FAILED"}
|
||||||
|
app_logger.error(data)
|
||||||
|
return make_response(jsonify(data), 400)
|
||||||
|
|
||||||
|
|
||||||
|
# curl -v -u <user>:<pass> -X POST -H "Content-type: application/json" -d @test_input.json http://localhost:5010/faces/api/v1.0/batch
|
||||||
|
@app.route("/api/faces/v1.0/batch", methods=["POST"])
|
||||||
|
@auth.login_required
|
||||||
|
def batch_id():
|
||||||
|
content_type = request.headers.get("Content-Type")
|
||||||
|
if content_type == "application/json":
|
||||||
|
if request.is_json:
|
||||||
|
data = request.json
|
||||||
|
lookupIds = data.get("lookupIds")
|
||||||
|
faceVectorArchive = data.get("faceVectorArchive")
|
||||||
|
recognitionThreshold = data.get("recognitionThreshold")
|
||||||
|
try:
|
||||||
|
matches = batch_identification(
|
||||||
|
lookupIds, faceVectorArchive, recognitionThreshold
|
||||||
|
)
|
||||||
|
time = datetime.datetime.now()
|
||||||
|
data = {
|
||||||
|
"message": matches,
|
||||||
|
"code": "SUCCESS",
|
||||||
|
"insertTimestamp": time,
|
||||||
|
}
|
||||||
|
return make_response(jsonify(data), 200)
|
||||||
|
except Exception as e:
|
||||||
|
data = {
|
||||||
|
"message": f"Batch identification failed with error: {e}",
|
||||||
|
"code": "FAILED",
|
||||||
|
}
|
||||||
|
app_logger.error(data)
|
||||||
|
return make_response(jsonify(data), 400)
|
||||||
|
else:
|
||||||
|
data = {"message": f"Invalid json: {request}", "code": "FAILED"}
|
||||||
|
return make_response(jsonify(data), 400)
|
||||||
|
else:
|
||||||
|
data = {"message": f"Wrong content type: {content_type}", "code": "FAILED"}
|
||||||
|
return make_response(jsonify(data), 400)
|
||||||
|
|
||||||
|
def run_app():
|
||||||
|
port = int(os.environ.get("PORT", 5010))
|
||||||
|
app_logger.info(ABS_PATH)
|
||||||
|
app.secret_key = get_config()["key"]
|
||||||
|
mode = get_config()["mode"]
|
||||||
|
scheduler.init_app(app)
|
||||||
|
scheduler.start()
|
||||||
|
if mode == "debug":
|
||||||
|
app.run(debug=True, host="0.0.0.0", port=port)
|
||||||
|
elif mode == "prod":
|
||||||
|
app.run(port=port)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
run_app()
|
||||||
108
master-node/src/cluster.py
Normal file
108
master-node/src/cluster.py
Normal file
|
|
@ -0,0 +1,108 @@
|
||||||
|
import configparser
|
||||||
|
import json
|
||||||
|
import pandas as pd
|
||||||
|
import paramiko
|
||||||
|
import os
|
||||||
|
from tabulate import tabulate
|
||||||
|
|
||||||
|
|
||||||
|
class Cluster:
|
||||||
|
def __init__(self, config_path: str, abs_path: str) -> None:
|
||||||
|
self.abs_path = abs_path
|
||||||
|
self.config = configparser.ConfigParser()
|
||||||
|
self.config.read(config_path)
|
||||||
|
self.auth_data = dict(self.config.items("CLUSTER"))
|
||||||
|
self.ssh = paramiko.SSHClient()
|
||||||
|
|
||||||
|
def connect(self) -> None:
|
||||||
|
k = paramiko.RSAKey.from_private_key_file(
|
||||||
|
os.path.join(self.abs_path, self.auth_data["key"])
|
||||||
|
)
|
||||||
|
host = self.auth_data["host"]
|
||||||
|
port = self.auth_data["port"]
|
||||||
|
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||||
|
# self.ssh.load_system_host_keys()
|
||||||
|
# self.ssh.get_host_keys().add(host, "ssh-rsa", k)
|
||||||
|
self.ssh.connect(
|
||||||
|
hostname=self.auth_data["host"],
|
||||||
|
username=self.auth_data["user"],
|
||||||
|
pkey=k,
|
||||||
|
port=self.auth_data["port"],
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_state(self) -> list:
|
||||||
|
self.connect()
|
||||||
|
get_state = "docker node ls"
|
||||||
|
try:
|
||||||
|
ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(get_state)
|
||||||
|
except Exception as e:
|
||||||
|
print(ssh_stderr)
|
||||||
|
print(e)
|
||||||
|
return 1
|
||||||
|
ssh_stdout.channel.set_combine_stderr(True)
|
||||||
|
output = ssh_stdout.readlines()
|
||||||
|
self.ssh.close()
|
||||||
|
return output
|
||||||
|
|
||||||
|
def run_ansible_command(self, command: str, use_configs=False) -> list:
|
||||||
|
self.connect()
|
||||||
|
if use_configs:
|
||||||
|
command = "cd sinapse-configs/nodes/ansible && " + command
|
||||||
|
try:
|
||||||
|
ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(command)
|
||||||
|
ssh_stdout.channel.set_combine_stderr(True)
|
||||||
|
output = ssh_stdout.readlines()
|
||||||
|
except Exception as e:
|
||||||
|
print(ssh_stderr)
|
||||||
|
print(e)
|
||||||
|
output = ssh_stderr.readlines()
|
||||||
|
|
||||||
|
self.ssh.close()
|
||||||
|
# formatted_output = "".join(output)
|
||||||
|
# return json.loads(formatted_output)
|
||||||
|
return output
|
||||||
|
|
||||||
|
def format_state(self, as_json=False) -> str:
|
||||||
|
state = self.get_state()
|
||||||
|
keys = state[0].split()
|
||||||
|
f_state = []
|
||||||
|
try:
|
||||||
|
for node in state[1:-1]:
|
||||||
|
buffer = {}
|
||||||
|
line = node.split()
|
||||||
|
for i, j in zip(line, keys):
|
||||||
|
buffer.update({j: i})
|
||||||
|
f_state.append(buffer)
|
||||||
|
df = pd.DataFrame(f_state)
|
||||||
|
df = df.rename(columns={"MANAGER": "ENGINE"})
|
||||||
|
df["MANAGER"] = ""
|
||||||
|
leader = state[-1].split()
|
||||||
|
df2 = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"ID": leader[0],
|
||||||
|
"HOSTNAME": leader[2],
|
||||||
|
"STATUS": leader[3],
|
||||||
|
"AVAILABILITY": leader[4],
|
||||||
|
"ENGINE": leader[6],
|
||||||
|
"MANAGER": leader[5],
|
||||||
|
},
|
||||||
|
index=[0],
|
||||||
|
)
|
||||||
|
df = pd.concat([df, df2], ignore_index=True)
|
||||||
|
print(tabulate(df, headers="keys", tablefmt="psql"))
|
||||||
|
if not as_json:
|
||||||
|
return df.to_json(orient="records")
|
||||||
|
else:
|
||||||
|
# return tabulate(df, headers="keys", tablefmt="psql")
|
||||||
|
return df.to_dict("records")
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
return "Format failed"
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cluster = Cluster(
|
||||||
|
os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.ini"),
|
||||||
|
os.path.dirname(os.path.realpath(__file__)),
|
||||||
|
)
|
||||||
|
print(cluster.format_state())
|
||||||
80
master-node/src/cluster_state.py
Normal file
80
master-node/src/cluster_state.py
Normal file
|
|
@ -0,0 +1,80 @@
|
||||||
|
import json
|
||||||
|
import pprint
|
||||||
|
import os
|
||||||
|
import string
|
||||||
|
import time
|
||||||
|
|
||||||
|
from cluster import Cluster
|
||||||
|
from manager_agent import CLUSTER_STATE, NODES
|
||||||
|
|
||||||
|
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
CONFIG = os.path.join(ABS_PATH, "config.ini")
|
||||||
|
JSON_FILE = "cluster_state_running_only.json"
|
||||||
|
|
||||||
|
|
||||||
|
class AutoState:
|
||||||
|
def __init__(self, debug=False) -> None:
|
||||||
|
self.cluster = Cluster(CONFIG, ABS_PATH)
|
||||||
|
self.debug = debug
|
||||||
|
|
||||||
|
def check_cluster_state(self, per_node=False, current_state=None) -> None:
|
||||||
|
start_time = time.time()
|
||||||
|
if current_state:
|
||||||
|
state = current_state
|
||||||
|
else:
|
||||||
|
state = CLUSTER_STATE
|
||||||
|
if per_node:
|
||||||
|
for node in state:
|
||||||
|
command = (
|
||||||
|
f"ansible-playbook -i hosts/{node}.yml playbooks/check_module.yml"
|
||||||
|
)
|
||||||
|
cluster_state = self.cluster.run_ansible_command(
|
||||||
|
command, use_configs=True
|
||||||
|
)
|
||||||
|
start = cluster_state.index(
|
||||||
|
"TASK [../roles/check : Display running info] ***********************************\n"
|
||||||
|
)
|
||||||
|
running = cluster_state[start + 3]
|
||||||
|
running = running.translate({ord(c): None for c in string.whitespace})
|
||||||
|
running = running.replace('"', "")
|
||||||
|
state[node]["running"] = int(running)
|
||||||
|
print(node)
|
||||||
|
pprint.pprint(state[node])
|
||||||
|
else:
|
||||||
|
command = "ansible-playbook -i hosts/nodes.yml playbooks/check_module.yml"
|
||||||
|
cluster_state = self.cluster.run_ansible_command(command, use_configs=True)
|
||||||
|
start = cluster_state.index(
|
||||||
|
"TASK [../roles/check : Display running info] ***********************************\n"
|
||||||
|
)
|
||||||
|
i, j = 1, 3
|
||||||
|
for x in range(0, NODES):
|
||||||
|
node = cluster_state[start + i]
|
||||||
|
node = node[node.find("[") + 1 : node.find("]")]
|
||||||
|
if node not in state:
|
||||||
|
continue
|
||||||
|
running = cluster_state[start + j]
|
||||||
|
running = running.translate({ord(c): None for c in string.whitespace})
|
||||||
|
running = running.replace('"', "")
|
||||||
|
state[node]["running"] = int(running)
|
||||||
|
i += 5
|
||||||
|
j += 5
|
||||||
|
# pprint.pprint(
|
||||||
|
# ">>> Got cluster state by %s seconds" % (time.time() - start_time)
|
||||||
|
# )
|
||||||
|
start_time = time.time()
|
||||||
|
json_state = json.dumps(state, indent=4)
|
||||||
|
with open(os.path.join(ABS_PATH, JSON_FILE), "w") as outfile:
|
||||||
|
outfile.write(json_state)
|
||||||
|
# pprint.pprint(
|
||||||
|
# ">>> Saved cluster state by %s seconds" % (time.time() - start_time),
|
||||||
|
# )
|
||||||
|
# print(f">>> Saved cluster state to {JSON_FILE}")
|
||||||
|
if self.debug:
|
||||||
|
pprint.pprint(CLUSTER_STATE)
|
||||||
|
if current_state:
|
||||||
|
return state
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
auto = AutoState()
|
||||||
|
auto.check_cluster_state()
|
||||||
2
master-node/src/docker-gunicorn.conf.py
Normal file
2
master-node/src/docker-gunicorn.conf.py
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
accesslog = "/logs/access.log"
|
||||||
|
errorlog = "/logs/error.log"
|
||||||
547
master-node/src/load_balancer.py
Normal file
547
master-node/src/load_balancer.py
Normal file
|
|
@ -0,0 +1,547 @@
|
||||||
|
import argparse
|
||||||
|
import errno
|
||||||
|
import os
|
||||||
|
import select
|
||||||
|
import socket
|
||||||
|
import sys
|
||||||
|
|
||||||
|
"""
|
||||||
|
Есть горячие и холодные ноды.
|
||||||
|
При старте работы на 6 горячих нод уже запущены 6 контейнеров по 1 на каждой. На холодных установлен только образ, запущенных нет.
|
||||||
|
Далее берём максимум в 12 контейнеров на ноде.
|
||||||
|
При превышении запросами текущего количества работающих контейнеров стартуют контейнеры на горячих нодах до 6 контейнеров на каждой ноде (36).
|
||||||
|
Далее при увеличении нагрузки включаются холодные ноды, и также до 6 контейнеров на ноде (30).
|
||||||
|
Далее по аналогии увеличение контейнеров на горячих до 12 на ноде (72), и для холодных (60).
|
||||||
|
При постоянном потоке запросов превышающим кол-во работающих контейнеров (132) отбрасываем сообщения, что кластер забит.
|
||||||
|
Если нагрузка спадает, то на горячих нодах контейнеры работают ещё в течение 10 минут, далее включаются до 1 на ноде. На холодных - сразу всё отключается до 0.
|
||||||
|
|
||||||
|
Реализовано это всё через функции постоянного опроса стейта кластера, сохранения стейта и счётчиков работающих контейнеров с меткой модуля
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
СТАТИЧЕСКАЯ БАЛАНСИРОВКА
|
||||||
|
11 нод в кластере: 6 горячих, 5 холодных.
|
||||||
|
Оценить влияние кол-ва лиц на предельно допустимые значения нагрузки.
|
||||||
|
МАКС КОНТЕЙНЕРОВ = 176 TODO - const value - find max values - ?
|
||||||
|
МАКС ПО ТИПАМ (1-16, 2-14, 3-12, 4-10) TODO - const value - find max values - ?
|
||||||
|
Добавить черный список контейнеров - ?
|
||||||
|
VS
|
||||||
|
Фильтрация параметров - ?
|
||||||
|
|
||||||
|
0. 6 контейнеров на горячих нодах 1:1, 0 контейнеров на холодных нодах
|
||||||
|
1. Приемка запроса на подписку от грады
|
||||||
|
2. Определение типа детектора (4 типа: 1-2-3-4)
|
||||||
|
3. Определение текущей загрузки горячих нод (прямое исполнение команды на мастере)
|
||||||
|
4. Если не достигли половины предела (6 контейнеров * 6 нод = 36), то используем текущие,
|
||||||
|
запускаем новые с другим типом детектора (аргумент через ENV)
|
||||||
|
5. По достижению пол-предела в 36 контейнеров начинается загрузка холодных нод по аналогии с п. 4 (5 * 6 = 30)
|
||||||
|
6. По достижению пол-предела в 36 + 30 = 66 контейнеров - старт загрузки горячих нод по пункту 4
|
||||||
|
7. По аналогии с холодными нодами до 132 (при 1 типе), убывание максимума обратно пропорционально
|
||||||
|
кол-во запущенных модулей с мощным детектором
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class LoadBalancer:
|
||||||
|
sendbuffer = 4096
|
||||||
|
maxbuffersize = 2**16
|
||||||
|
maxservers = 132
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host="0.0.0.0",
|
||||||
|
port=80,
|
||||||
|
targets=[],
|
||||||
|
fallback=None,
|
||||||
|
check_time=None,
|
||||||
|
debug=False,
|
||||||
|
family=socket.AF_INET,
|
||||||
|
typ=socket.SOCK_STREAM,
|
||||||
|
):
|
||||||
|
assert isinstance(host, str), ValueError("Expected string as host-argument")
|
||||||
|
assert isinstance(port, int), ValueError("Expected int as port-argument")
|
||||||
|
assert isinstance(targets, list), ValueError(
|
||||||
|
"Expected list as targets-arguments"
|
||||||
|
)
|
||||||
|
assert isinstance(fallback, tuple) or (fallback is None), ValueError(
|
||||||
|
"Expected tuple or None as fallback-argument"
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
isinstance(check_time, int)
|
||||||
|
or isinstance(check_time, float)
|
||||||
|
or (check_time is None)
|
||||||
|
), ValueError("Expected int,long or float as check_time-argument")
|
||||||
|
assert check_time >= 0 or (check_time is None), ValueError(
|
||||||
|
"Expected positive integer as check_time-argument"
|
||||||
|
)
|
||||||
|
assert isinstance(debug, bool), ValueError("Expected bool as debug-argument")
|
||||||
|
for addr in targets:
|
||||||
|
assert len(addr) == 2, ValueError(
|
||||||
|
"Each address in targets needs to be a tuple of length 2"
|
||||||
|
)
|
||||||
|
assert isinstance(addr[0], str), ValueError(
|
||||||
|
"Each address in targets needs to have a str at index 0"
|
||||||
|
)
|
||||||
|
assert isinstance(addr[1], int), ValueError(
|
||||||
|
"Each address in targets needs to have a int at index 1"
|
||||||
|
)
|
||||||
|
assert addr[1] > 0, ValueError(
|
||||||
|
"Each address in targets needs to have a integer larger 0 as port"
|
||||||
|
)
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.targets = targets
|
||||||
|
self.fallback = fallback
|
||||||
|
self.check_time = check_time
|
||||||
|
self.debug = debug
|
||||||
|
self.family = family
|
||||||
|
self.type = typ
|
||||||
|
self.o2i = {}
|
||||||
|
self.i2o = {}
|
||||||
|
self.s2b = {}
|
||||||
|
self.o2t = {}
|
||||||
|
self.t2n = {}
|
||||||
|
self.running = False
|
||||||
|
self.bind_and_listen()
|
||||||
|
|
||||||
|
def bind_and_listen(self):
|
||||||
|
if self.debug:
|
||||||
|
print("binding...")
|
||||||
|
self.listen_s = socket.socket(self.family, self.type, 0)
|
||||||
|
self.listen_s.bind((self.host, self.port))
|
||||||
|
if self.debug:
|
||||||
|
print("bound to: {host}:{port}".format(host=self.host, port=self.port))
|
||||||
|
try:
|
||||||
|
self.listen_s.listen(3)
|
||||||
|
except:
|
||||||
|
self.listen_s.listen(1)
|
||||||
|
|
||||||
|
def mainloop(self):
|
||||||
|
if self.debug:
|
||||||
|
print("entering mainloop.")
|
||||||
|
self.running = True
|
||||||
|
try:
|
||||||
|
while self.running:
|
||||||
|
checkread = checkex = (
|
||||||
|
[self.listen_s] + self.i2o.keys() + self.o2i.keys()
|
||||||
|
)
|
||||||
|
checkwrite = filter(
|
||||||
|
None, [s for s in self.s2b.keys() if len(self.s2b[s]) > 0]
|
||||||
|
)
|
||||||
|
toread, towrite, exceptional = select.select(
|
||||||
|
checkread, checkwrite, checkex, self.check_time
|
||||||
|
)
|
||||||
|
if self.debug:
|
||||||
|
print("data avaible on: ", toread)
|
||||||
|
print("buffer free on: ", towrite)
|
||||||
|
print("errors on: ", exceptional)
|
||||||
|
if len(toread) == 0 and len(towrite) == 0 and len(exceptional) == 0:
|
||||||
|
continue
|
||||||
|
for s in exceptional:
|
||||||
|
if s is self.listen_s:
|
||||||
|
if self.debug:
|
||||||
|
print("error in listening socket!")
|
||||||
|
self.running = False
|
||||||
|
raise RuntimeError("Error in listening socket!")
|
||||||
|
elif s in self.o2i:
|
||||||
|
if self.debug:
|
||||||
|
print("error in client connection: closing socket.")
|
||||||
|
self.close_s(s)
|
||||||
|
else:
|
||||||
|
s2c = s
|
||||||
|
c2s = self.i2o[s2c]
|
||||||
|
if c2s in self.o2t:
|
||||||
|
old_peer = self.o2t[c2s]
|
||||||
|
if old_peer == self.fallback or (self.fallback is None):
|
||||||
|
old_peer = None
|
||||||
|
else:
|
||||||
|
old_peer = None
|
||||||
|
if old_peer is None:
|
||||||
|
if self.debug:
|
||||||
|
print("fallback not possible, closing s")
|
||||||
|
self.close_s(c2s)
|
||||||
|
else:
|
||||||
|
if self.debug:
|
||||||
|
print(
|
||||||
|
"error in connection to normal server, instead connecting to fallback-server."
|
||||||
|
""
|
||||||
|
)
|
||||||
|
if self.debug:
|
||||||
|
print("unregistering old peer")
|
||||||
|
try:
|
||||||
|
s2c.shutdown(socket.SHUT_RDWR)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
s2c.close()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
if s2c in self.s2b:
|
||||||
|
old_buff = self.s2b[s2c]
|
||||||
|
if self.debug:
|
||||||
|
print(
|
||||||
|
"redirecting {n} bytes to fallback server".format(
|
||||||
|
n=len(old_buff)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
del self.s2b[s2c]
|
||||||
|
else:
|
||||||
|
old_buff = ""
|
||||||
|
if s2c in self.i2o:
|
||||||
|
del self.i2o[s2c]
|
||||||
|
if c2s in self.o2t:
|
||||||
|
t = self.o2t[c2s]
|
||||||
|
if t in self.t2n:
|
||||||
|
self.t2n[t] = max(0, self.t2n[t] - 1)
|
||||||
|
self.o2t[c2s] = self.fallback
|
||||||
|
peer = socket.socket(self.family, self.type, 0)
|
||||||
|
peer.setblocking(0)
|
||||||
|
self.s2b[peer] = old_buff
|
||||||
|
self.o2i[c2s] = peer
|
||||||
|
self.i2o[peer] = c2s
|
||||||
|
self.o2t[c2s] = self.fallback
|
||||||
|
try:
|
||||||
|
err = peer.connect_ex(self.fallback)
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
err = e.args[0]
|
||||||
|
except:
|
||||||
|
err = None
|
||||||
|
if (
|
||||||
|
err == errno.EWOULDBLOCK
|
||||||
|
or err == errno.WSAEWOULDBLOCK
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if self.debug:
|
||||||
|
print("error during connect to fallback:", e)
|
||||||
|
self.close_s(c2s)
|
||||||
|
continue
|
||||||
|
if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if self.debug:
|
||||||
|
print(
|
||||||
|
"error during connet to fallback:",
|
||||||
|
errno.errorcode[err],
|
||||||
|
)
|
||||||
|
self.close_s(c2s)
|
||||||
|
for s in towrite:
|
||||||
|
if s not in self.s2b:
|
||||||
|
continue
|
||||||
|
if len(self.s2b[s]) < self.sendbuffer:
|
||||||
|
tosend = self.s2b[s]
|
||||||
|
else:
|
||||||
|
tosend = self.s2b[s][: self.sendbuffer]
|
||||||
|
if self.debug:
|
||||||
|
print(
|
||||||
|
"sending {n} bytes (left: {t} bytes)".format(
|
||||||
|
n=len(tosend), t=len(self.s2b[s]) - len(tosend)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
sent = s.send(tosend)
|
||||||
|
except socket.error as e:
|
||||||
|
if self.debug:
|
||||||
|
print("error writing buffer:", e)
|
||||||
|
self.close_s(s)
|
||||||
|
continue
|
||||||
|
if self.debug:
|
||||||
|
print("sent {n} bytes.".format(n=sent))
|
||||||
|
if sent >= len(self.s2b[s]):
|
||||||
|
self.s2b[s] = ""
|
||||||
|
self.s2b[s] = self.s2b[s][sent:]
|
||||||
|
for s in toread:
|
||||||
|
if s is self.listen_s:
|
||||||
|
if self.debug:
|
||||||
|
print("got request")
|
||||||
|
if len(self.targets) == 0:
|
||||||
|
if self.debug:
|
||||||
|
print("using fallback server")
|
||||||
|
target = self.fallback
|
||||||
|
else:
|
||||||
|
c = 9999999
|
||||||
|
target = None
|
||||||
|
for ta in targets:
|
||||||
|
if ta not in self.t2n:
|
||||||
|
self.t2n[ta] = 0
|
||||||
|
n = self.t2n[ta]
|
||||||
|
if n < c:
|
||||||
|
c = n
|
||||||
|
target = ta
|
||||||
|
if target is None:
|
||||||
|
if self.debug:
|
||||||
|
print("cannot find a target!")
|
||||||
|
continue
|
||||||
|
if target in self.t2n:
|
||||||
|
self.t2n[target] += 1
|
||||||
|
else:
|
||||||
|
self.t2n[target] = 1
|
||||||
|
if self.debug:
|
||||||
|
print("target is:", target)
|
||||||
|
new_s, addr = self.listen_s.accept()
|
||||||
|
new_s.setblocking(0)
|
||||||
|
peer = socket.socket(self.family, self.type, 0)
|
||||||
|
peer.setblocking(0)
|
||||||
|
self.s2b[new_s] = ""
|
||||||
|
self.s2b[peer] = ""
|
||||||
|
self.o2i[new_s] = peer
|
||||||
|
self.i2o[peer] = new_s
|
||||||
|
self.o2t[new_s] = target
|
||||||
|
try:
|
||||||
|
err = peer.connect_ex(target)
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
err = e.args[0]
|
||||||
|
except:
|
||||||
|
err = None
|
||||||
|
if err == errno.EWOULDBLOCK or err == errno.WSAEWOULDBLOCK:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if self.debug:
|
||||||
|
print("error during connect to target:", e)
|
||||||
|
self.close_s(new_s)
|
||||||
|
continue
|
||||||
|
if err == errno.EINPROGRESS or err == errno.WSAEWOULDBLOCK:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if self.debug:
|
||||||
|
print(
|
||||||
|
"error during connet to target:",
|
||||||
|
errno.errorcode[err],
|
||||||
|
)
|
||||||
|
self.close_s(new_s)
|
||||||
|
else:
|
||||||
|
if s in self.i2o:
|
||||||
|
p = self.i2o[s]
|
||||||
|
elif s in self.o2i:
|
||||||
|
p = self.o2i[s]
|
||||||
|
else:
|
||||||
|
if self.debug:
|
||||||
|
print("a socket has no peer registered, closing it.")
|
||||||
|
self.close_s(s)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
peerbufferlength = len(self.s2b[p])
|
||||||
|
except KeyError:
|
||||||
|
peerbufferlength = 0
|
||||||
|
self.s2b[p] = ""
|
||||||
|
if peerbufferlength < self.maxbuffersize:
|
||||||
|
maxread = self.maxbuffersize - peerbufferlength
|
||||||
|
try:
|
||||||
|
data = s.recv(maxread)
|
||||||
|
except socket.error as e:
|
||||||
|
err = e.args[0]
|
||||||
|
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
if self.debug:
|
||||||
|
print("error while receiving:", e)
|
||||||
|
self.close_s(s)
|
||||||
|
continue
|
||||||
|
if self.debug:
|
||||||
|
print("reveived {n} bytes.".format(n=len(data)))
|
||||||
|
if len(data) == 0:
|
||||||
|
if self.debug:
|
||||||
|
print("connection closed.")
|
||||||
|
self.close_s(s)
|
||||||
|
else:
|
||||||
|
self.s2b[p] += data
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if self.debug:
|
||||||
|
print("closing all connections...")
|
||||||
|
try:
|
||||||
|
self.listen_s.close()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
for s in self.o2i.keys() + self.i2o.keys():
|
||||||
|
self.close_s(s)
|
||||||
|
|
||||||
|
def close_s(self, s):
|
||||||
|
try:
|
||||||
|
s.shutdown(socket.SHUT_RDWR)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
s.close()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
if s in self.s2b:
|
||||||
|
del self.s2b[s]
|
||||||
|
if s in self.o2t:
|
||||||
|
t = self.o2t[s]
|
||||||
|
del self.o2t[s]
|
||||||
|
if t in self.t2n:
|
||||||
|
self.t2n[t] = max(self.t2n[t] - 1, 0)
|
||||||
|
if s in self.o2i:
|
||||||
|
p = self.o2i[s]
|
||||||
|
del self.o2i[s]
|
||||||
|
self.close_s(p)
|
||||||
|
if s in self.i2o:
|
||||||
|
p = self.i2o[s]
|
||||||
|
del self.i2o[s]
|
||||||
|
self.close_s(p)
|
||||||
|
|
||||||
|
def add_target(self, addr):
|
||||||
|
assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument")
|
||||||
|
assert len(addr) == 2, ValueError(
|
||||||
|
"Expected a tuple of length 2 as addr-argument"
|
||||||
|
)
|
||||||
|
assert isinstance(addr[0], str), ValueError(
|
||||||
|
"Expected addr-argument to have a string at index 0"
|
||||||
|
)
|
||||||
|
assert isinstance(addr[1], str), ValueError(
|
||||||
|
"Expected addr-argument to have a integer at index 1"
|
||||||
|
)
|
||||||
|
if addr in self.targets:
|
||||||
|
raise ValueError("Target already registered!")
|
||||||
|
else:
|
||||||
|
self.targets.append(addr)
|
||||||
|
|
||||||
|
def remove_target(self, addr):
|
||||||
|
assert isinstance(addr, tuple), ValueError("Expected a tuple as addr-argument")
|
||||||
|
assert len(addr) == 2, ValueError(
|
||||||
|
"Expected a tuple of length 2 as addr-argument"
|
||||||
|
)
|
||||||
|
assert isinstance(addr[0], str), ValueError(
|
||||||
|
"Expected addr-argument to have a string at index 0"
|
||||||
|
)
|
||||||
|
assert isinstance(addr[1], str), ValueError(
|
||||||
|
"Expected addr-argument to have a integer at index 1"
|
||||||
|
)
|
||||||
|
if addr in self.targets:
|
||||||
|
self.targets.remove(addr)
|
||||||
|
else:
|
||||||
|
raise ValueError("Target not found!")
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if not self.running:
|
||||||
|
raise RuntimeError("Server not running!")
|
||||||
|
self.running = False
|
||||||
|
self.check_time = 0
|
||||||
|
|
||||||
|
|
||||||
|
def load_from_file(path, ignore_errors=False):
|
||||||
|
targets = []
|
||||||
|
with open(path, "rU") as f:
|
||||||
|
for line in f:
|
||||||
|
try:
|
||||||
|
if line.startswith("#"):
|
||||||
|
continue
|
||||||
|
if line.count(":") != 1:
|
||||||
|
raise SyntaxError(
|
||||||
|
"Error in line '{l}': expected exactly one ':'!".format(l=line)
|
||||||
|
)
|
||||||
|
host, port = line.split(":")
|
||||||
|
try:
|
||||||
|
port = int(port)
|
||||||
|
except ValueError:
|
||||||
|
raise SyntaxError(
|
||||||
|
"Error in line '{l}': cannot convert port to int!".format(
|
||||||
|
l=line
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if len(host) == 0:
|
||||||
|
raise SyntaxError(
|
||||||
|
"Error in line '{l}': invalid host format!".format(l=line)
|
||||||
|
)
|
||||||
|
targets.append((host, port))
|
||||||
|
except SyntaxError:
|
||||||
|
if not ignore_errors:
|
||||||
|
raise
|
||||||
|
return targets
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="A Load-Balancer")
|
||||||
|
parser.add_argument("host", help="host/ip to bind to")
|
||||||
|
parser.add_argument("port", type=int, help="port to bind to")
|
||||||
|
parser.add_argument(
|
||||||
|
"-d", action="store_true", dest="debug", help="enable debug-mode"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-p",
|
||||||
|
action="store",
|
||||||
|
dest="path",
|
||||||
|
help="load target list from file",
|
||||||
|
required=False,
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-f",
|
||||||
|
action="store",
|
||||||
|
dest="fallback",
|
||||||
|
help="target to relay connections to if no server are aviable",
|
||||||
|
required=False,
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-t",
|
||||||
|
action="store",
|
||||||
|
dest="targets",
|
||||||
|
help="targets to spread connections to",
|
||||||
|
required=False,
|
||||||
|
default=None,
|
||||||
|
nargs="+",
|
||||||
|
)
|
||||||
|
ns = parser.parse_args()
|
||||||
|
if ns.targets is None:
|
||||||
|
targets = []
|
||||||
|
else:
|
||||||
|
targets = []
|
||||||
|
for ta in ns.targets:
|
||||||
|
if ta.count(":") != 1:
|
||||||
|
print(
|
||||||
|
"SyntaxError in command-line target-list: expected exactly one ':'!"
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
host, port = ta.split(":")
|
||||||
|
if len(host) == 0:
|
||||||
|
print("SyntaxError in command-line target-list: invalid host!")
|
||||||
|
sys.exit(1)
|
||||||
|
try:
|
||||||
|
port = int(port)
|
||||||
|
if port <= 0:
|
||||||
|
raise ValueError
|
||||||
|
except ValueError:
|
||||||
|
print("SyntaxError in command-line target-list: invalid port!")
|
||||||
|
sys.exit(1)
|
||||||
|
targets.append((host, port))
|
||||||
|
if ns.fallback is not None:
|
||||||
|
if ns.fallback.count(":") != 1:
|
||||||
|
print("SyntaxError in fallback-argument: expected exactly one ':'!")
|
||||||
|
sys.exit(1)
|
||||||
|
host, port = ns.fallback.split(":")
|
||||||
|
if len(host) == 0:
|
||||||
|
print("SyntaxError in fallback-argument: invalid host!")
|
||||||
|
sys.exit(1)
|
||||||
|
try:
|
||||||
|
port = int(port)
|
||||||
|
if port <= 0:
|
||||||
|
raise ValueError
|
||||||
|
except ValueError:
|
||||||
|
print("SyntaxError in fallback-argument: invalid port!")
|
||||||
|
sys.exit(1)
|
||||||
|
fallback = (host, port)
|
||||||
|
else:
|
||||||
|
fallback = None
|
||||||
|
if ns.path is None:
|
||||||
|
pass
|
||||||
|
elif not os.path.isfile(ns.path):
|
||||||
|
print("Error: File not found!")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
targets += load_from_file(ns.path, False)
|
||||||
|
if len(targets) == 0:
|
||||||
|
print("Error: no targets found!")
|
||||||
|
sys.exit(1)
|
||||||
|
lb = LoadBalancer(ns.host, ns.port, targets, fallback, None, ns.debug)
|
||||||
|
try:
|
||||||
|
lb.mainloop()
|
||||||
|
except (KeyboardInterrupt, SystemExit) as e:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
lb.stop()
|
||||||
|
except RuntimeError as e:
|
||||||
|
pass
|
||||||
77
master-node/src/logger.py
Normal file
77
master-node/src/logger.py
Normal file
|
|
@ -0,0 +1,77 @@
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
LOG_FILE = os.path.join(os.getcwd(), "logfile.log")
|
||||||
|
|
||||||
|
|
||||||
|
class LoggerFactory:
|
||||||
|
"""
|
||||||
|
Class for logging behaviour of data exporting - object of ExportingTool class
|
||||||
|
"""
|
||||||
|
log_level = logging.INFO
|
||||||
|
formatter = logging.Formatter("%(asctime)s — %(name)s — %(levelname)s — %(message)s")
|
||||||
|
show = True
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setting(cls,
|
||||||
|
log_level: Union[str, int] = logging.INFO,
|
||||||
|
log_format: str = "%(asctime)s — %(name)s — %(levelname)s — %(message)s",
|
||||||
|
show: bool = True) -> None:
|
||||||
|
"""
|
||||||
|
Re-defined __init__ method which sets show parametr
|
||||||
|
|
||||||
|
Args:
|
||||||
|
log_level (str, int): logging level as string or logging.<log level> (like logging.INFO)
|
||||||
|
log_format: (str): logging format of saved massage
|
||||||
|
show (bool): if set all logs will be shown in terminal
|
||||||
|
"""
|
||||||
|
if isinstance(log_level, str):
|
||||||
|
log_level: int = logging.getLevelName(log_level)
|
||||||
|
cls.show = show
|
||||||
|
cls.log_level = log_level
|
||||||
|
cls.formatter = logging.Formatter(log_format)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_console_handler(cls) -> logging.StreamHandler:
|
||||||
|
"""
|
||||||
|
Class method the aim of which is getting a console handler to show logs on terminal
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
logging.StreamHandler: handler object for streaming output through terminal
|
||||||
|
"""
|
||||||
|
console_handler = logging.StreamHandler(sys.stdout)
|
||||||
|
console_handler.setFormatter(cls.formatter)
|
||||||
|
return console_handler
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_file_handler(cls) -> logging.FileHandler:
|
||||||
|
"""
|
||||||
|
Class method the aim of which is getting a file handler to write logs in file LOG_FILE
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
logging.FileHandler: handler object for streaming output through std::filestream
|
||||||
|
"""
|
||||||
|
file_handler = logging.FileHandler(LOG_FILE, mode="w")
|
||||||
|
file_handler.setFormatter(cls.formatter)
|
||||||
|
return file_handler
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_logger(cls, logger_name: str):
|
||||||
|
"""
|
||||||
|
Class method which creates logger with certain name
|
||||||
|
|
||||||
|
Args:
|
||||||
|
logger_name (str): name for logger
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
logger: object of Logger class
|
||||||
|
"""
|
||||||
|
logger = logging.getLogger(logger_name)
|
||||||
|
logger.setLevel(cls.log_level)
|
||||||
|
if cls.show:
|
||||||
|
logger.addHandler(cls.get_console_handler())
|
||||||
|
logger.addHandler(cls.get_file_handler())
|
||||||
|
logger.propagate = False
|
||||||
|
return logger
|
||||||
228
master-node/src/manager_agent.py
Normal file
228
master-node/src/manager_agent.py
Normal file
|
|
@ -0,0 +1,228 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import pprint
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from typing import Union, Tuple, Any
|
||||||
|
|
||||||
|
from module_size_calculator import get_face_recognition_module_resource_usage
|
||||||
|
from cluster import Cluster
|
||||||
|
from utils.patterns import Singleton
|
||||||
|
|
||||||
|
MAX_SYSTEM_MEMORY = 3000 # Mb
|
||||||
|
MAX_DEVICE_MEMORY = 7500 # Mb
|
||||||
|
MAX_SCORE = 110 # %
|
||||||
|
|
||||||
|
# MIN_PORT = 1000
|
||||||
|
# MAX_PORT = 2000
|
||||||
|
USED_PORTS = set()
|
||||||
|
|
||||||
|
NODES = 11
|
||||||
|
|
||||||
|
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
CONFIG = os.path.join(ABS_PATH, "config.ini")
|
||||||
|
USAGE_CONFIG = os.path.join(ABS_PATH, "docker_module_resource_usage.yaml")
|
||||||
|
RUNNING_STATE = "cluster_state_running_only.json"
|
||||||
|
FULL_STATE = "cluster_state_full.json"
|
||||||
|
SOFT_STATE = {"image": "", "detector_size": "", "max_faces": "", "min_fps": ""}
|
||||||
|
|
||||||
|
HARD_STATE = {
|
||||||
|
"SYSTEM_MEMORY": 0,
|
||||||
|
"DEVICE_MEMORY": 0,
|
||||||
|
"CONTAINER_CAPACITY": 0,
|
||||||
|
"CPU_USAGE": None,
|
||||||
|
"DEVICE_USAGE": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
CLUSTER_STATE = {
|
||||||
|
# "1-node": {
|
||||||
|
# "soft_state": [],
|
||||||
|
# "hard_state": HARD_STATE,
|
||||||
|
# "running": 0,
|
||||||
|
# "min_port": 1024,
|
||||||
|
# "max_port": 1224,
|
||||||
|
# },
|
||||||
|
"2-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 2024,
|
||||||
|
"max_port": 2224,
|
||||||
|
},
|
||||||
|
"3-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 3024,
|
||||||
|
"max_port": 3224,
|
||||||
|
},
|
||||||
|
"4-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 4024,
|
||||||
|
"max_port": 4224,
|
||||||
|
},
|
||||||
|
"5-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 5024,
|
||||||
|
"max_port": 5224,
|
||||||
|
},
|
||||||
|
"6-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 6024,
|
||||||
|
"max_port": 6224,
|
||||||
|
},
|
||||||
|
"7-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 7024,
|
||||||
|
"max_port": 7224,
|
||||||
|
},
|
||||||
|
"8-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 8024,
|
||||||
|
"max_port": 8224,
|
||||||
|
},
|
||||||
|
"9-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 9024,
|
||||||
|
"max_port": 9224,
|
||||||
|
},
|
||||||
|
"10-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 10024,
|
||||||
|
"max_port": 10224,
|
||||||
|
},
|
||||||
|
"11-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 11024,
|
||||||
|
"max_port": 11224,
|
||||||
|
},
|
||||||
|
"12-node": {
|
||||||
|
"soft_state": [],
|
||||||
|
"hard_state": HARD_STATE,
|
||||||
|
"running": 0,
|
||||||
|
"min_port": 12024,
|
||||||
|
"max_port": 12224,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class Agent(Singleton):
|
||||||
|
def __init__(self, debug=False) -> None:
|
||||||
|
self.cluster = Cluster(CONFIG, ABS_PATH)
|
||||||
|
self.debug = debug
|
||||||
|
|
||||||
|
def read_cluster_state(self) -> None:
|
||||||
|
try:
|
||||||
|
with open(os.path.join(ABS_PATH, RUNNING_STATE), "r") as f:
|
||||||
|
running_state = json.load(f)
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
raise FileNotFoundError(f"Cluster state is not available by {RUNNING_STATE}")
|
||||||
|
for node in running_state:
|
||||||
|
if node not in CLUSTER_STATE:
|
||||||
|
continue
|
||||||
|
CLUSTER_STATE[node]["running"] = running_state[node]["running"]
|
||||||
|
|
||||||
|
def sort_state(self) -> list:
|
||||||
|
sorted_state_list = sorted(CLUSTER_STATE.items(), key=lambda x: x[1]["running"])
|
||||||
|
if self.debug:
|
||||||
|
print("Sorted cluster state")
|
||||||
|
pprint.pprint(sorted_state_list)
|
||||||
|
return sorted_state_list
|
||||||
|
|
||||||
|
def get_node_port(
|
||||||
|
self,
|
||||||
|
input_image_number,
|
||||||
|
image_width,
|
||||||
|
image_height,
|
||||||
|
max_faces,
|
||||||
|
min_fps,
|
||||||
|
detector_size,
|
||||||
|
image,
|
||||||
|
) -> Union[None, Tuple[Any, int]]:
|
||||||
|
hard_state = get_face_recognition_module_resource_usage(
|
||||||
|
USAGE_CONFIG,
|
||||||
|
input_image_number=input_image_number,
|
||||||
|
image_width=image_width,
|
||||||
|
image_height=image_height,
|
||||||
|
max_faces=max_faces,
|
||||||
|
min_fps=min_fps,
|
||||||
|
detector_size=detector_size,
|
||||||
|
)
|
||||||
|
self.read_cluster_state()
|
||||||
|
sorted_state = self.sort_state()
|
||||||
|
for node in sorted_state:
|
||||||
|
future_sys_mem = (
|
||||||
|
CLUSTER_STATE[node[0]]["hard_state"]["SYSTEM_MEMORY"]
|
||||||
|
+ hard_state["SYSTEM_MEMORY"]
|
||||||
|
)
|
||||||
|
future_dev_mem = (
|
||||||
|
CLUSTER_STATE[node[0]]["hard_state"]["DEVICE_MEMORY"]
|
||||||
|
+ hard_state["DEVICE_MEMORY"]
|
||||||
|
)
|
||||||
|
future_node_cap = (
|
||||||
|
CLUSTER_STATE[node[0]]["hard_state"]["CONTAINER_CAPACITY"]
|
||||||
|
+ hard_state["CONTAINER_CAPACITY"]
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
future_sys_mem < MAX_SYSTEM_MEMORY
|
||||||
|
and future_dev_mem < MAX_DEVICE_MEMORY
|
||||||
|
and future_node_cap < MAX_SCORE
|
||||||
|
):
|
||||||
|
CLUSTER_STATE[node[0]]["soft_state"].append(
|
||||||
|
{
|
||||||
|
"image": image,
|
||||||
|
"detector_size": detector_size,
|
||||||
|
"max_faces": max_faces,
|
||||||
|
"min_fps": min_fps,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
CLUSTER_STATE[node[0]]["hard_state"] = hard_state
|
||||||
|
port = random.randint(
|
||||||
|
CLUSTER_STATE[node[0]]["min_port"],
|
||||||
|
CLUSTER_STATE[node[0]]["max_port"],
|
||||||
|
)
|
||||||
|
while port in USED_PORTS:
|
||||||
|
port = random.randint(
|
||||||
|
CLUSTER_STATE[node[0]]["min_port"],
|
||||||
|
CLUSTER_STATE[node[0]]["max_port"],
|
||||||
|
)
|
||||||
|
USED_PORTS.add(port)
|
||||||
|
if self.debug:
|
||||||
|
print("Future cluster state")
|
||||||
|
pprint.pprint(CLUSTER_STATE)
|
||||||
|
json_state = json.dumps(CLUSTER_STATE, indent=4)
|
||||||
|
with open(os.path.join(ABS_PATH, FULL_STATE), "w") as outfile:
|
||||||
|
outfile.write(json_state)
|
||||||
|
return node[0], port
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
agent = Agent()
|
||||||
|
node, port = agent.get_node_port(
|
||||||
|
input_image_number=8,
|
||||||
|
image_width=2560,
|
||||||
|
image_height=1440,
|
||||||
|
max_faces=5,
|
||||||
|
min_fps=12,
|
||||||
|
detector_size=640,
|
||||||
|
image="test",
|
||||||
|
)
|
||||||
|
print(node, port)
|
||||||
157
master-node/src/manager_utils.py
Normal file
157
master-node/src/manager_utils.py
Normal file
|
|
@ -0,0 +1,157 @@
|
||||||
|
import asyncio
|
||||||
|
import configparser
|
||||||
|
import logging
|
||||||
|
from typing import List, Dict, Any, Optional, Tuple, Union
|
||||||
|
|
||||||
|
# ============================= Batch identification ==========================
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import copy
|
||||||
|
|
||||||
|
from detectors_stream_worker import DetectorsStreamWorker, DetectorsStreamWorkerManager
|
||||||
|
from utils.base64_convector import base64_2_vector
|
||||||
|
from face_identification import vector_identification
|
||||||
|
import os
|
||||||
|
|
||||||
|
CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.ini")
|
||||||
|
CONFIG = configparser.ConfigParser()
|
||||||
|
CONFIG.read(CONFIG_PATH)
|
||||||
|
CONF_DATA = dict(CONFIG.items("MANAGER"))
|
||||||
|
|
||||||
|
from db_tools import DataBaseManager
|
||||||
|
|
||||||
|
app_logger = logging.getLogger('APP')
|
||||||
|
|
||||||
|
def search_vectors_in_db(
|
||||||
|
lookupIds: List[str],
|
||||||
|
target, # id-шники требуемых векторов
|
||||||
|
) -> Union[str, bytes, List[List[float]]]:
|
||||||
|
manager = DataBaseManager(CONFIG_PATH)
|
||||||
|
lookupIds_vectors = manager.get_vectors(lookupIds, target=target)
|
||||||
|
return lookupIds_vectors
|
||||||
|
|
||||||
|
|
||||||
|
def batch_identification(
|
||||||
|
lookupIds: List[str],
|
||||||
|
faceVectorArchive: List[Dict[str, Any]],
|
||||||
|
recognitionThreshold: float,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Функция поиска лиц из базы розыска в faceVectorArchive.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
lookupVectors (List[str]): список id-шников векторов лиц из базы поиска людей,
|
||||||
|
с которыми необходимо сравнивать незнакомые лица. Формат:
|
||||||
|
[
|
||||||
|
“de630dfa-158a-4d20-a903-ded922132124”,
|
||||||
|
...
|
||||||
|
]
|
||||||
|
faceVectorArchive (List[Dict[str, Any]]): вектора лиц с видеокамер, которые необходимо сравнить
|
||||||
|
с людьми из базы поиска. Формат:
|
||||||
|
[
|
||||||
|
{
|
||||||
|
vectorId: “de630dfa-158a-4d20-a903-ded922132124”,
|
||||||
|
faceVector: base64-строка (закодированные 512 флотов)
|
||||||
|
}
|
||||||
|
...
|
||||||
|
]
|
||||||
|
recognitionThreshold (float): пороговое значение сходства, меньше которого лица будут считаться различными.
|
||||||
|
|
||||||
|
Return:
|
||||||
|
{
|
||||||
|
matches: [
|
||||||
|
{
|
||||||
|
faceVectorId: “de630dfa-158a-4d20-a903-ded922132124”, - id вектора из архива,
|
||||||
|
на котором нашлось совпадение
|
||||||
|
vectorId: “de630dfa-158a-4d20-a903-ded922132124”, - id вектора из базы розыска
|
||||||
|
recognitionConfidence: float - уровень уверенности (должно быть больше чем threshold,
|
||||||
|
чтобы детекция попала в совпадения)
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
# Ищем вектора в базе розыска по lookupIds
|
||||||
|
lookupIds_vectors: Union[str, bytes, List[List[float]]] = search_vectors_in_db(
|
||||||
|
lookupIds, target="base_id"
|
||||||
|
)
|
||||||
|
# Формируем структуру базы розыска
|
||||||
|
lookupVectors: Dict[str, List[float]] = [
|
||||||
|
{"vectorId": name, "faceVector": vector}
|
||||||
|
for name, vector in zip(lookupIds, lookupIds_vectors)
|
||||||
|
]
|
||||||
|
|
||||||
|
# Раскодируем вектора из баз розыска
|
||||||
|
# Пройдёмся по всем базам розыска
|
||||||
|
faceVectorArchive = copy.deepcopy(faceVectorArchive)
|
||||||
|
for search_base in [lookupVectors, faceVectorArchive]:
|
||||||
|
# Пройдёмся по каждой записи
|
||||||
|
for row in search_base:
|
||||||
|
# Если вектор - строка или байты - раскодируем
|
||||||
|
vector = row.get("faceVector")
|
||||||
|
if isinstance(vector, (str, bytes)):
|
||||||
|
row["faceVector"] = base64_2_vector(vector, dtype=np.float32).tolist()
|
||||||
|
|
||||||
|
# Вызываем метод поиска
|
||||||
|
identification_result = vector_identification(
|
||||||
|
lookupVectors=lookupVectors,
|
||||||
|
faceVectorArchive=faceVectorArchive,
|
||||||
|
recognitionThreshold=recognitionThreshold,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Сформируем результат
|
||||||
|
result = [
|
||||||
|
{
|
||||||
|
"vectorId": row["vectorId"],
|
||||||
|
"faceVectorId": row["detectionId"],
|
||||||
|
"recognitionConfidence": row["recognitionConfidence"],
|
||||||
|
}
|
||||||
|
for row in identification_result
|
||||||
|
]
|
||||||
|
|
||||||
|
return {"matches": result}
|
||||||
|
|
||||||
|
|
||||||
|
def subscribe_to_detections(
|
||||||
|
stream: str,
|
||||||
|
detectionFrequency: int,
|
||||||
|
recognitionThreshold: float,
|
||||||
|
searchBaseIds: List[str],
|
||||||
|
bboxStream: Optional[str] = None,
|
||||||
|
eventsStream: Optional[str] = None,
|
||||||
|
bboxThreshold: Optional[float] = None,
|
||||||
|
detectionThreshold: Optional[float] = None,
|
||||||
|
controlArea: Optional[List[Tuple[int, int]]] = None,
|
||||||
|
minFaceSize: Optional[float] = None,
|
||||||
|
maxFacesPerFrame: Optional[int] = None,
|
||||||
|
enableAttributes: bool = False
|
||||||
|
):
|
||||||
|
worker = DetectorsStreamWorker(
|
||||||
|
stream=stream,
|
||||||
|
detectionFrequency=detectionFrequency,
|
||||||
|
recognitionThreshold=recognitionThreshold,
|
||||||
|
searchBaseIds=searchBaseIds,
|
||||||
|
bboxStream=bboxStream,
|
||||||
|
eventsStream=eventsStream,
|
||||||
|
bboxThreshold=bboxThreshold,
|
||||||
|
detectionThreshold=detectionThreshold,
|
||||||
|
controlArea=controlArea,
|
||||||
|
minFaceSize=minFaceSize,
|
||||||
|
maxFacesPerFrame=maxFacesPerFrame,
|
||||||
|
enableAttributes=enableAttributes,
|
||||||
|
)
|
||||||
|
# Получим асинхронный цикл
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
app_logger.debug('worker create')
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(worker.create())
|
||||||
|
except Exception as e:
|
||||||
|
app_logger.debug(f'worker close with error: {e}')
|
||||||
|
loop.run_until_complete(worker.close())
|
||||||
|
raise e
|
||||||
|
else:
|
||||||
|
app_logger.debug('worker run')
|
||||||
|
DetectorsStreamWorkerManager.add(worker)
|
||||||
Loading…
Reference in a new issue