added files to master-node related to custom modules
This commit is contained in:
parent
a64c004e89
commit
9632552cff
5 changed files with 313 additions and 29 deletions
|
|
@ -7,7 +7,7 @@ from flask import (
|
||||||
flash,
|
flash,
|
||||||
request,
|
request,
|
||||||
redirect,
|
redirect,
|
||||||
url_for,
|
url_for,
|
||||||
render_template,
|
render_template,
|
||||||
jsonify,
|
jsonify,
|
||||||
make_response,
|
make_response,
|
||||||
|
|
@ -18,62 +18,73 @@ from pydantic import ValidationError
|
||||||
from werkzeug.utils import secure_filename
|
from werkzeug.utils import secure_filename
|
||||||
from werkzeug.security import generate_password_hash, check_password_hash
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||||||
|
|
||||||
|
from logger import LoggerFactory
|
||||||
|
from cluster import Cluster
|
||||||
|
from cluster_state import AutoState
|
||||||
|
from custom_modules import reset_to_initial_state, create_custom_containers, stop_all_custom_containers
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# Проинициализируем константы
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
|
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||||
MAX_CONTENT_PATH = 1000000000
|
MAX_CONTENT_PATH = 1000000000
|
||||||
CONFIG = os.path.join(ABS_PATH, "config.ini")
|
CONFIG = os.path.join(ABS_PATH, "config.ini")
|
||||||
SCHEDULER_API_ENABLED = True
|
SCHEDULER_API_ENABLED = True
|
||||||
SHOW_LOG = True
|
SHOW_LOG = True
|
||||||
|
|
||||||
from logger import LoggerFactory
|
|
||||||
|
|
||||||
|
|
||||||
LoggerFactory.setting(
|
LoggerFactory.setting(
|
||||||
log_level=os.getenv("LOG_LEVEL", "INFO"),
|
log_level=os.getenv("LOG_LEVEL", "INFO"),
|
||||||
log_format="[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s",
|
log_format="[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s",
|
||||||
show=True,
|
show=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
from cluster import Cluster
|
|
||||||
from cluster_state import AutoState
|
|
||||||
|
|
||||||
|
|
||||||
app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates"))
|
|
||||||
app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH
|
|
||||||
app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED
|
|
||||||
app.config["SESSION_TYPE"] = "filesystem"
|
|
||||||
|
|
||||||
auth = HTTPBasicAuth()
|
auth = HTTPBasicAuth()
|
||||||
scheduler = APScheduler()
|
scheduler = APScheduler()
|
||||||
app_logger = LoggerFactory.get_logger("APP")
|
app_logger = LoggerFactory.get_logger("APP")
|
||||||
AUTO = AutoState(debug=False)
|
AUTO = AutoState(debug=False)
|
||||||
|
|
||||||
|
|
||||||
# @scheduler.task("interval", id="cluster_state", seconds=30, misfire_grace_time=900)
|
|
||||||
# def cluster_state():
|
|
||||||
# AUTO.check_cluster_state()
|
|
||||||
# # app_logger.debug("Finished with auto cluster state")
|
|
||||||
|
|
||||||
|
|
||||||
def get_config() -> dict:
|
def get_config() -> dict:
|
||||||
config = configparser.ConfigParser()
|
config = configparser.ConfigParser()
|
||||||
config.read(CONFIG)
|
config.read(CONFIG)
|
||||||
auth_data = dict(config.items("API"))
|
auth_data = dict(config.items("API"))
|
||||||
return auth_data
|
return auth_data
|
||||||
|
|
||||||
|
|
||||||
USER = get_config()["user"]
|
USER = get_config()["user"]
|
||||||
PASSWORD = get_config()["password"]
|
PASSWORD = get_config()["password"]
|
||||||
|
|
||||||
users = {
|
users = {
|
||||||
USER: generate_password_hash(PASSWORD),
|
USER: generate_password_hash(PASSWORD),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@auth.verify_password
|
@auth.verify_password
|
||||||
def verify_password(username, password):
|
def verify_password(username, password):
|
||||||
if username in users and check_password_hash(users.get(username), password):
|
if username in users and check_password_hash(users.get(username), password):
|
||||||
return username
|
return username
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# Создадим приложение
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
|
app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates"))
|
||||||
|
app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH
|
||||||
|
app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED
|
||||||
|
app.config["SESSION_TYPE"] = "filesystem"
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# Настроим хуки и ручки приложения
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
|
@app.before_first_request
|
||||||
|
def startup():
|
||||||
|
# Приведем систему в начальное состояние
|
||||||
|
reset_to_initial_state()
|
||||||
|
app_logger.info("master-node запущена!")
|
||||||
|
|
||||||
|
@app.teardown_appcontext
|
||||||
|
def shutdown(exception=None):
|
||||||
|
# Остановим все контейнеры
|
||||||
|
stop_all_custom_containers()
|
||||||
|
app_logger.info("master-node остановлена!")
|
||||||
|
|
||||||
# curl -u <user>:<pass> -d "playbook=ping_workers&args=hosts=workers" -v http://localhost:5010/api/v1.0/run_ansible
|
# curl -u <user>:<pass> -d "playbook=ping_workers&args=hosts=workers" -v http://localhost:5010/api/v1.0/run_ansible
|
||||||
@app.route("/api/v1.0/run_ansible", methods=["POST"])
|
@app.route("/api/v1.0/run_ansible", methods=["POST"])
|
||||||
|
|
@ -100,8 +111,30 @@ def run_ansible():
|
||||||
app_logger.error(data)
|
app_logger.error(data)
|
||||||
return make_response(jsonify(data), 400)
|
return make_response(jsonify(data), 400)
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# Методы, которыми управляет scheduler
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
def run_app():
|
#@scheduler.task(
|
||||||
|
# "interval",
|
||||||
|
# id="cluster_state",
|
||||||
|
# seconds=30,
|
||||||
|
# misfire_grace_time=900
|
||||||
|
#)
|
||||||
|
#def cluster_state():
|
||||||
|
# AUTO.check_cluster_state()
|
||||||
|
# app_logger.debug("Finished with auto cluster state")
|
||||||
|
|
||||||
|
#@scheduler.task(
|
||||||
|
# "interval",
|
||||||
|
# id="cluster_state",
|
||||||
|
# seconds=30,
|
||||||
|
# misfire_grace_time=900
|
||||||
|
#)
|
||||||
|
#def custom_modules():
|
||||||
|
# create_custom_containers()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
port = int(os.environ.get("PORT", 5010))
|
port = int(os.environ.get("PORT", 5010))
|
||||||
app_logger.info(ABS_PATH)
|
app_logger.info(ABS_PATH)
|
||||||
app.secret_key = get_config()["key"]
|
app.secret_key = get_config()["key"]
|
||||||
|
|
@ -112,7 +145,3 @@ def run_app():
|
||||||
app.run(debug=True, host="0.0.0.0", port=port)
|
app.run(debug=True, host="0.0.0.0", port=port)
|
||||||
elif mode == "prod":
|
elif mode == "prod":
|
||||||
app.run(port=port)
|
app.run(port=port)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
run_app()
|
|
||||||
|
|
|
||||||
1
master-node/src/custom_modules/__init__.py
Normal file
1
master-node/src/custom_modules/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
from .manager import reset_to_initial_state, create_custom_containers, stop_all_custom_containers
|
||||||
56
master-node/src/custom_modules/get_env_vars.py
Normal file
56
master-node/src/custom_modules/get_env_vars.py
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
def load_env_files(
|
||||||
|
files: list
|
||||||
|
)-> dict:
|
||||||
|
"""Извлечение переменных окружения из файлов"""
|
||||||
|
env = {}
|
||||||
|
for file in files:
|
||||||
|
with open(file) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line and not line.startswith("#"):
|
||||||
|
key, value = line.split("=", 1)
|
||||||
|
env[key] = value
|
||||||
|
return env
|
||||||
|
|
||||||
|
def get_env_vars():
|
||||||
|
# Проверяем наличие кодовой базы
|
||||||
|
# TODO: ansible playbook
|
||||||
|
ml_path = "/path-to-ml"
|
||||||
|
|
||||||
|
# Получаем переменные
|
||||||
|
env_files = [f"{ml_path}/.env", f"{ml_path}/docker/.env.dev", f"{ml_path}/docker/.env.compose_vars"]
|
||||||
|
all_envs = load_env_files(env_files)
|
||||||
|
|
||||||
|
# Формируем аргументы для сборки и запуска контейнеров
|
||||||
|
build_args = {
|
||||||
|
"path": f"{ml_path}/src_ml/",
|
||||||
|
"dockerfile": "docker/custom_modules_api/Dockerfile",
|
||||||
|
"tag": "",
|
||||||
|
"buildargs": {
|
||||||
|
"CUDA_VERSION": all_envs["CUDA_VERSION"],
|
||||||
|
"TORCH_VERSION": all_envs["TORCH_VERSION"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
run_envs = {
|
||||||
|
"API_HOST": '0.0.0.0',
|
||||||
|
"API_PORT": 8000,
|
||||||
|
"PROCESSED_INSTANCE_LOG_PERIOD": all_envs['PROCESSED_INSTANCE_LOG_PERIOD'] or 60,
|
||||||
|
"ALL_BOOTSTRAP_SERVERS": f"{all_envs['INTERNAL_BROKER_NAME']}:9090",
|
||||||
|
"VIDEO_FRAMES_BOOTSTRAP_SERVERS": f"{all_envs['INTERNAL_BROKER_NAME']}:9090",
|
||||||
|
"VIDEO_FRAMES_SCHEMA_REGISTRY_URL": f"http://{all_envs['INTERNAL_SCHEMA_REGISTRY_NAME']}:8081",
|
||||||
|
"ML_RESULT_BOOTSTRAP_SERVERS": f"{all_envs['INTERNAL_BROKER_NAME']}:9090",
|
||||||
|
"ML_RESULT_SCHEMA_REGISTRY_URL": f"http://{all_envs['INTERNAL_SCHEMA_REGISTRY_NAME']}:8081",
|
||||||
|
# Настройки подключения к базе настроек
|
||||||
|
"REMOTE_SETTINGS_REDIS_SERVER_HOST": all_envs['ML_SETTINGS_REDIS_NAME'],
|
||||||
|
"REMOTE_SETTINGS_REDIS_SERVER_PORT": all_envs['ML_SETTINGS_REDIS_PORT'] or 6379,
|
||||||
|
"REMOTE_SETTINGS_REDIS_SERVER_PASSWORD": all_envs['ML_SETTINGS_REDIS_PASSWORD', 'password123'],
|
||||||
|
"REMOTE_SETTINGS_MODEL_CONTEXT_PREFIX": 'models_',
|
||||||
|
# Доступ к хранению весов модели в MinIO
|
||||||
|
"WEIGHTS_MINIO_SERVERS": f"{all_envs['FRAME_MINIO_HOST']}:{all_envs['FRAME_MINIO_PORT']}",
|
||||||
|
"WEIGHTS_MINIO_USER": all_envs['MINIO_ROOT_USER', 'minioadmin'],
|
||||||
|
"WEIGHTS_MINIO_PASSWORD": all_envs['MINIO_ROOT_PASSWORD', 'minioadmin']
|
||||||
|
}
|
||||||
|
|
||||||
|
return all_envs, build_args, run_envs
|
||||||
119
master-node/src/custom_modules/manager.py
Normal file
119
master-node/src/custom_modules/manager.py
Normal file
|
|
@ -0,0 +1,119 @@
|
||||||
|
from multiprocessing import Process
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from .manager_methods import start_container, stop_container
|
||||||
|
from .get_env_vars import get_env_vars
|
||||||
|
from ..logger import LoggerFactory
|
||||||
|
|
||||||
|
_AUTH_URL = "https://api.statanly.com:8443/api/auth/login"
|
||||||
|
_ALL_MODULES_URL = "https://api.statanly.com:8443/api/custom-modules"
|
||||||
|
manager_logger = LoggerFactory.get_logger("CustomModuleManager")
|
||||||
|
|
||||||
|
|
||||||
|
def reset_to_initial_state():
|
||||||
|
# получим информацию о кастомных модулей
|
||||||
|
response = requests.get(_ALL_MODULES_URL)
|
||||||
|
if response.status_code != 200:
|
||||||
|
manager_logger.warning(f"Не удалось получить информацию о кастомных модулях. Код: {response.status_code}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# получим переменные окружения
|
||||||
|
try:
|
||||||
|
all_envs, _, _ = get_env_vars()
|
||||||
|
except Exception as e:
|
||||||
|
manager_logger.warning(f"Не удалось получить переменные окружения: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# проходимся по всем модулям (id, title, is_SIZ, status, model)
|
||||||
|
processed_modules = []
|
||||||
|
custom_modules = response.json()
|
||||||
|
for module in custom_modules:
|
||||||
|
# инициализируем переменные контейнера
|
||||||
|
container_name = f"{all_envs['COMPOSE_PROJECT_NAME']}_custom_module{module['id']}"
|
||||||
|
# останавливаем контейнер в отдельном процессе
|
||||||
|
p = Process(target=stop_container,
|
||||||
|
args=(
|
||||||
|
container_name,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
p.start()
|
||||||
|
processed_modules.append(p)
|
||||||
|
|
||||||
|
# Завершаем все запущенные процессы
|
||||||
|
for p in processed_modules:
|
||||||
|
p.join()
|
||||||
|
|
||||||
|
def create_custom_containers():
|
||||||
|
# получим информацию о кастомных модулей
|
||||||
|
response = requests.get(_ALL_MODULES_URL)
|
||||||
|
if response.status_code != 200:
|
||||||
|
manager_logger.warning(f"Не удалось получить информацию о кастомных модулях. Код: {response.status_code}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# получим переменные окружения
|
||||||
|
try:
|
||||||
|
all_envs, build_args, run_envs = get_env_vars()
|
||||||
|
except Exception as e:
|
||||||
|
manager_logger.warning(f"Не удалось получить переменные окружения: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# проходимся по всем модулям (id, title, is_SIZ, status, model)
|
||||||
|
processed_modules = []
|
||||||
|
custom_modules = response.json()
|
||||||
|
print(custom_modules)
|
||||||
|
for module in custom_modules:
|
||||||
|
if module["status"] in ["остановлен", "не создан"] and module["model"]["weights"].strip():
|
||||||
|
# инициализируем переменные контейнера
|
||||||
|
image_name = f"statanly/{all_envs['PROJECT_NAME']}/ml/custom_module{module['id']}/{all_envs['BUILD_NAME']}:latest"
|
||||||
|
container_name = f"{all_envs['COMPOSE_PROJECT_NAME']}_custom_module{module['id']}"
|
||||||
|
build_args["tag"] = image_name
|
||||||
|
# запускаем контейнер в отдельном процессе
|
||||||
|
p = Process(target=start_container,
|
||||||
|
args=(
|
||||||
|
image_name,
|
||||||
|
build_args,
|
||||||
|
container_name,
|
||||||
|
all_envs["GLOBAL_NET_NAME"],
|
||||||
|
run_envs,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
p.start()
|
||||||
|
processed_modules.append(p)
|
||||||
|
|
||||||
|
# Завершаем все запущенные процессы
|
||||||
|
for p in processed_modules:
|
||||||
|
p.join()
|
||||||
|
|
||||||
|
def stop_all_custom_containers():
|
||||||
|
# получим информацию о кастомных модулей
|
||||||
|
response = requests.get(_ALL_MODULES_URL)
|
||||||
|
if response.status_code != 200:
|
||||||
|
manager_logger.warning(f"Не удалось получить информацию о кастомных модулях. Код: {response.status_code}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# получим переменные окружения
|
||||||
|
try:
|
||||||
|
all_envs, _, _ = get_env_vars()
|
||||||
|
except Exception as e:
|
||||||
|
manager_logger.warning(f"Не удалось получить переменные окружения: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# проходимся по всем модулям (id, title, is_SIZ, status, model)
|
||||||
|
processed_modules = []
|
||||||
|
custom_modules = response.json()
|
||||||
|
for module in custom_modules:
|
||||||
|
if module["status"] in ["работает"]:
|
||||||
|
# инициализируем переменные контейнера
|
||||||
|
container_name = f"{all_envs['COMPOSE_PROJECT_NAME']}_custom_module{module['id']}"
|
||||||
|
# останавливаем контейнер в отдельном процессе
|
||||||
|
p = Process(target=stop_container,
|
||||||
|
args=(
|
||||||
|
container_name,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
p.start()
|
||||||
|
processed_modules.append(p)
|
||||||
|
|
||||||
|
# Завершаем все запущенные процессы
|
||||||
|
for p in processed_modules:
|
||||||
|
p.join()
|
||||||
79
master-node/src/custom_modules/manager_methods.py
Normal file
79
master-node/src/custom_modules/manager_methods.py
Normal file
|
|
@ -0,0 +1,79 @@
|
||||||
|
import docker
|
||||||
|
|
||||||
|
from ..logger import LoggerFactory
|
||||||
|
|
||||||
|
_ALL_MODULES_URL = "https://api.statanly.com:8443/api/custom-modules"
|
||||||
|
manager_logger = LoggerFactory.get_logger("CustomModuleManager")
|
||||||
|
|
||||||
|
def build_image(
|
||||||
|
client: docker.DockerClient,
|
||||||
|
path: str,
|
||||||
|
dockerfile: str,
|
||||||
|
tag: str,
|
||||||
|
buildargs: dict
|
||||||
|
):
|
||||||
|
"""Сборка docker контейнеров"""
|
||||||
|
_, logs = client.images.build(
|
||||||
|
path = path,
|
||||||
|
dockerfile = dockerfile,
|
||||||
|
tag = tag,
|
||||||
|
buildargs = buildargs,
|
||||||
|
decode = True
|
||||||
|
)
|
||||||
|
for chunk in logs:
|
||||||
|
if 'stream' in chunk:
|
||||||
|
manager_logger.info(chunk['stream'].strip())
|
||||||
|
|
||||||
|
def start_container(
|
||||||
|
image_name: str,
|
||||||
|
build_args: dict,
|
||||||
|
container_name: str,
|
||||||
|
network_name: str,
|
||||||
|
env_vars: dict,
|
||||||
|
gpus = True,
|
||||||
|
detach = True
|
||||||
|
):
|
||||||
|
"""Запуск docker контейнеров"""
|
||||||
|
# инициализируем клиента
|
||||||
|
client = docker.from_env()
|
||||||
|
|
||||||
|
# Соберем контейнер
|
||||||
|
try:
|
||||||
|
build_image(client, **build_args)
|
||||||
|
manager_logger.info(f"Контейнер '{container_name}' успешно собран")
|
||||||
|
except Exception as e:
|
||||||
|
manager_logger.error(f"Ошибка при сборке контейнера: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Укажем устройства исполнения
|
||||||
|
device_requests = [docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])] if gpus else None
|
||||||
|
|
||||||
|
# Запустим контейнер
|
||||||
|
try:
|
||||||
|
container = client.containers.run(
|
||||||
|
image = image_name,
|
||||||
|
name = container_name,
|
||||||
|
network = client.networks.get(network_name),
|
||||||
|
environment = env_vars,
|
||||||
|
device_requests=device_requests,
|
||||||
|
detach = detach,
|
||||||
|
)
|
||||||
|
manager_logger.info(f"Контейнер '{container_name}' с ID: {container.short_id} запустился")
|
||||||
|
except Exception as e:
|
||||||
|
manager_logger.error(f"Ошибка при запуске контейнера: {e}")
|
||||||
|
|
||||||
|
def stop_container(
|
||||||
|
name: str
|
||||||
|
):
|
||||||
|
"""Удаление docker контейнеров по имени"""
|
||||||
|
# инициализируем клиента
|
||||||
|
client = docker.from_env()
|
||||||
|
|
||||||
|
# Остановим контейнер
|
||||||
|
try:
|
||||||
|
container = client.containers.get(name)
|
||||||
|
container.stop()
|
||||||
|
container.remove()
|
||||||
|
manager_logger.info(f"Контейнер '{name}' был остановлен")
|
||||||
|
except docker.errors.NotFound:
|
||||||
|
manager_logger.warning(f"Контейнер '{name}' не найден")
|
||||||
Loading…
Reference in a new issue