From 9632552cffc5ac109028c65dd6bdcc7a8fe33a6f Mon Sep 17 00:00:00 2001 From: Chebart Date: Thu, 23 Oct 2025 13:30:24 +0300 Subject: [PATCH] added files to master-node related to custom modules --- master-node/src/app.py | 87 ++++++++----- master-node/src/custom_modules/__init__.py | 1 + .../src/custom_modules/get_env_vars.py | 56 +++++++++ master-node/src/custom_modules/manager.py | 119 ++++++++++++++++++ .../src/custom_modules/manager_methods.py | 79 ++++++++++++ 5 files changed, 313 insertions(+), 29 deletions(-) create mode 100644 master-node/src/custom_modules/__init__.py create mode 100644 master-node/src/custom_modules/get_env_vars.py create mode 100644 master-node/src/custom_modules/manager.py create mode 100644 master-node/src/custom_modules/manager_methods.py diff --git a/master-node/src/app.py b/master-node/src/app.py index 8b9afc3..4b9873b 100644 --- a/master-node/src/app.py +++ b/master-node/src/app.py @@ -7,7 +7,7 @@ from flask import ( flash, request, redirect, - url_for, + url_for, render_template, jsonify, make_response, @@ -18,62 +18,73 @@ from pydantic import ValidationError from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash +from logger import LoggerFactory +from cluster import Cluster +from cluster_state import AutoState +from custom_modules import reset_to_initial_state, create_custom_containers, stop_all_custom_containers + + +# -------------------------- +# Проинициализируем константы +# -------------------------- + ABS_PATH = os.path.dirname(os.path.realpath(__file__)) MAX_CONTENT_PATH = 1000000000 CONFIG = os.path.join(ABS_PATH, "config.ini") SCHEDULER_API_ENABLED = True SHOW_LOG = True -from logger import LoggerFactory - - LoggerFactory.setting( log_level=os.getenv("LOG_LEVEL", "INFO"), log_format="[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s", show=True, ) - -from cluster import Cluster -from cluster_state import AutoState - - -app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates")) -app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH -app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED -app.config["SESSION_TYPE"] = "filesystem" - auth = HTTPBasicAuth() scheduler = APScheduler() app_logger = LoggerFactory.get_logger("APP") AUTO = AutoState(debug=False) - -# @scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900) -# def cluster_state(): -# AUTO.check_cluster_state() -# # app_logger.debug("Finished with auto cluster state") - - def get_config() -> dict: config = configparser.ConfigParser() config.read(CONFIG) auth_data = dict(config.items("API")) return auth_data - USER = get_config()["user"] PASSWORD = get_config()["password"] - users = { USER: generate_password_hash(PASSWORD), } - @auth.verify_password def verify_password(username, password): if username in users and check_password_hash(users.get(username), password): return username +# -------------------------- +# Создадим приложение +# -------------------------- + +app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates")) +app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH +app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED +app.config["SESSION_TYPE"] = "filesystem" + +# -------------------------- +# Настроим хуки и ручки приложения +# -------------------------- + +@app.before_first_request +def startup(): + # Приведем систему в начальное состояние + reset_to_initial_state() + app_logger.info("master-node запущена!") + +@app.teardown_appcontext +def shutdown(exception=None): + # Остановим все контейнеры + stop_all_custom_containers() + app_logger.info("master-node остановлена!") # curl -u : -d "playbook=ping_workers&args=hosts=workers" -v http://localhost:5010/api/v1.0/run_ansible @app.route("/api/v1.0/run_ansible", methods=["POST"]) @@ -100,8 +111,30 @@ def run_ansible(): app_logger.error(data) return make_response(jsonify(data), 400) +# -------------------------- +# Методы, которыми управляет scheduler +# -------------------------- -def run_app(): +#@scheduler.task( +# "interval", +# id="cluster_state", +# seconds=30, +# misfire_grace_time=900 +#) +#def cluster_state(): +# AUTO.check_cluster_state() +# app_logger.debug("Finished with auto cluster state") + +#@scheduler.task( +# "interval", +# id="cluster_state", +# seconds=30, +# misfire_grace_time=900 +#) +#def custom_modules(): +# create_custom_containers() + +if __name__ == "__main__": port = int(os.environ.get("PORT", 5010)) app_logger.info(ABS_PATH) app.secret_key = get_config()["key"] @@ -112,7 +145,3 @@ def run_app(): app.run(debug=True, host="0.0.0.0", port=port) elif mode == "prod": app.run(port=port) - - -if __name__ == "__main__": - run_app() diff --git a/master-node/src/custom_modules/__init__.py b/master-node/src/custom_modules/__init__.py new file mode 100644 index 0000000..9dd6f6c --- /dev/null +++ b/master-node/src/custom_modules/__init__.py @@ -0,0 +1 @@ +from .manager import reset_to_initial_state, create_custom_containers, stop_all_custom_containers \ No newline at end of file diff --git a/master-node/src/custom_modules/get_env_vars.py b/master-node/src/custom_modules/get_env_vars.py new file mode 100644 index 0000000..c62ae48 --- /dev/null +++ b/master-node/src/custom_modules/get_env_vars.py @@ -0,0 +1,56 @@ +import os + +def load_env_files( + files: list +)-> dict: + """Извлечение переменных окружения из файлов""" + env = {} + for file in files: + with open(file) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + env[key] = value + return env + +def get_env_vars(): + # Проверяем наличие кодовой базы + # TODO: ansible playbook + ml_path = "/path-to-ml" + + # Получаем переменные + env_files = [f"{ml_path}/.env", f"{ml_path}/docker/.env.dev", f"{ml_path}/docker/.env.compose_vars"] + all_envs = load_env_files(env_files) + + # Формируем аргументы для сборки и запуска контейнеров + build_args = { + "path": f"{ml_path}/src_ml/", + "dockerfile": "docker/custom_modules_api/Dockerfile", + "tag": "", + "buildargs": { + "CUDA_VERSION": all_envs["CUDA_VERSION"], + "TORCH_VERSION": all_envs["TORCH_VERSION"] + } + } + run_envs = { + "API_HOST": '0.0.0.0', + "API_PORT": 8000, + "PROCESSED_INSTANCE_LOG_PERIOD": all_envs['PROCESSED_INSTANCE_LOG_PERIOD'] or 60, + "ALL_BOOTSTRAP_SERVERS": f"{all_envs['INTERNAL_BROKER_NAME']}:9090", + "VIDEO_FRAMES_BOOTSTRAP_SERVERS": f"{all_envs['INTERNAL_BROKER_NAME']}:9090", + "VIDEO_FRAMES_SCHEMA_REGISTRY_URL": f"http://{all_envs['INTERNAL_SCHEMA_REGISTRY_NAME']}:8081", + "ML_RESULT_BOOTSTRAP_SERVERS": f"{all_envs['INTERNAL_BROKER_NAME']}:9090", + "ML_RESULT_SCHEMA_REGISTRY_URL": f"http://{all_envs['INTERNAL_SCHEMA_REGISTRY_NAME']}:8081", + # Настройки подключения к базе настроек + "REMOTE_SETTINGS_REDIS_SERVER_HOST": all_envs['ML_SETTINGS_REDIS_NAME'], + "REMOTE_SETTINGS_REDIS_SERVER_PORT": all_envs['ML_SETTINGS_REDIS_PORT'] or 6379, + "REMOTE_SETTINGS_REDIS_SERVER_PASSWORD": all_envs['ML_SETTINGS_REDIS_PASSWORD', 'password123'], + "REMOTE_SETTINGS_MODEL_CONTEXT_PREFIX": 'models_', + # Доступ к хранению весов модели в MinIO + "WEIGHTS_MINIO_SERVERS": f"{all_envs['FRAME_MINIO_HOST']}:{all_envs['FRAME_MINIO_PORT']}", + "WEIGHTS_MINIO_USER": all_envs['MINIO_ROOT_USER', 'minioadmin'], + "WEIGHTS_MINIO_PASSWORD": all_envs['MINIO_ROOT_PASSWORD', 'minioadmin'] + } + + return all_envs, build_args, run_envs \ No newline at end of file diff --git a/master-node/src/custom_modules/manager.py b/master-node/src/custom_modules/manager.py new file mode 100644 index 0000000..2b144ee --- /dev/null +++ b/master-node/src/custom_modules/manager.py @@ -0,0 +1,119 @@ +from multiprocessing import Process +import requests + +from .manager_methods import start_container, stop_container +from .get_env_vars import get_env_vars +from ..logger import LoggerFactory + +_AUTH_URL = "https://api.statanly.com:8443/api/auth/login" +_ALL_MODULES_URL = "https://api.statanly.com:8443/api/custom-modules" +manager_logger = LoggerFactory.get_logger("CustomModuleManager") + + +def reset_to_initial_state(): + # получим информацию о кастомных модулей + response = requests.get(_ALL_MODULES_URL) + if response.status_code != 200: + manager_logger.warning(f"Не удалось получить информацию о кастомных модулях. Код: {response.status_code}") + return + + # получим переменные окружения + try: + all_envs, _, _ = get_env_vars() + except Exception as e: + manager_logger.warning(f"Не удалось получить переменные окружения: {e}") + return + + # проходимся по всем модулям (id, title, is_SIZ, status, model) + processed_modules = [] + custom_modules = response.json() + for module in custom_modules: + # инициализируем переменные контейнера + container_name = f"{all_envs['COMPOSE_PROJECT_NAME']}_custom_module{module['id']}" + # останавливаем контейнер в отдельном процессе + p = Process(target=stop_container, + args=( + container_name, + ) + ) + p.start() + processed_modules.append(p) + + # Завершаем все запущенные процессы + for p in processed_modules: + p.join() + +def create_custom_containers(): + # получим информацию о кастомных модулей + response = requests.get(_ALL_MODULES_URL) + if response.status_code != 200: + manager_logger.warning(f"Не удалось получить информацию о кастомных модулях. Код: {response.status_code}") + return + + # получим переменные окружения + try: + all_envs, build_args, run_envs = get_env_vars() + except Exception as e: + manager_logger.warning(f"Не удалось получить переменные окружения: {e}") + return + + # проходимся по всем модулям (id, title, is_SIZ, status, model) + processed_modules = [] + custom_modules = response.json() + print(custom_modules) + for module in custom_modules: + if module["status"] in ["остановлен", "не создан"] and module["model"]["weights"].strip(): + # инициализируем переменные контейнера + image_name = f"statanly/{all_envs['PROJECT_NAME']}/ml/custom_module{module['id']}/{all_envs['BUILD_NAME']}:latest" + container_name = f"{all_envs['COMPOSE_PROJECT_NAME']}_custom_module{module['id']}" + build_args["tag"] = image_name + # запускаем контейнер в отдельном процессе + p = Process(target=start_container, + args=( + image_name, + build_args, + container_name, + all_envs["GLOBAL_NET_NAME"], + run_envs, + ) + ) + p.start() + processed_modules.append(p) + + # Завершаем все запущенные процессы + for p in processed_modules: + p.join() + +def stop_all_custom_containers(): + # получим информацию о кастомных модулей + response = requests.get(_ALL_MODULES_URL) + if response.status_code != 200: + manager_logger.warning(f"Не удалось получить информацию о кастомных модулях. Код: {response.status_code}") + return + + # получим переменные окружения + try: + all_envs, _, _ = get_env_vars() + except Exception as e: + manager_logger.warning(f"Не удалось получить переменные окружения: {e}") + return + + # проходимся по всем модулям (id, title, is_SIZ, status, model) + processed_modules = [] + custom_modules = response.json() + for module in custom_modules: + if module["status"] in ["работает"]: + # инициализируем переменные контейнера + container_name = f"{all_envs['COMPOSE_PROJECT_NAME']}_custom_module{module['id']}" + # останавливаем контейнер в отдельном процессе + p = Process(target=stop_container, + args=( + container_name, + ) + ) + p.start() + processed_modules.append(p) + + # Завершаем все запущенные процессы + for p in processed_modules: + p.join() \ No newline at end of file diff --git a/master-node/src/custom_modules/manager_methods.py b/master-node/src/custom_modules/manager_methods.py new file mode 100644 index 0000000..3289c23 --- /dev/null +++ b/master-node/src/custom_modules/manager_methods.py @@ -0,0 +1,79 @@ +import docker + +from ..logger import LoggerFactory + +_ALL_MODULES_URL = "https://api.statanly.com:8443/api/custom-modules" +manager_logger = LoggerFactory.get_logger("CustomModuleManager") + +def build_image( + client: docker.DockerClient, + path: str, + dockerfile: str, + tag: str, + buildargs: dict +): + """Сборка docker контейнеров""" + _, logs = client.images.build( + path = path, + dockerfile = dockerfile, + tag = tag, + buildargs = buildargs, + decode = True + ) + for chunk in logs: + if 'stream' in chunk: + manager_logger.info(chunk['stream'].strip()) + +def start_container( + image_name: str, + build_args: dict, + container_name: str, + network_name: str, + env_vars: dict, + gpus = True, + detach = True +): + """Запуск docker контейнеров""" + # инициализируем клиента + client = docker.from_env() + + # Соберем контейнер + try: + build_image(client, **build_args) + manager_logger.info(f"Контейнер '{container_name}' успешно собран") + except Exception as e: + manager_logger.error(f"Ошибка при сборке контейнера: {e}") + return + + # Укажем устройства исполнения + device_requests = [docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])] if gpus else None + + # Запустим контейнер + try: + container = client.containers.run( + image = image_name, + name = container_name, + network = client.networks.get(network_name), + environment = env_vars, + device_requests=device_requests, + detach = detach, + ) + manager_logger.info(f"Контейнер '{container_name}' с ID: {container.short_id} запустился") + except Exception as e: + manager_logger.error(f"Ошибка при запуске контейнера: {e}") + +def stop_container( + name: str +): + """Удаление docker контейнеров по имени""" + # инициализируем клиента + client = docker.from_env() + + # Остановим контейнер + try: + container = client.containers.get(name) + container.stop() + container.remove() + manager_logger.info(f"Контейнер '{name}' был остановлен") + except docker.errors.NotFound: + manager_logger.warning(f"Контейнер '{name}' не найден") \ No newline at end of file