devops-configs/master-node/src/app.py
2025-10-25 16:17:17 +03:00

190 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
import configparser
import datetime
from flask import (
Flask,
flash,
request,
redirect,
url_for,
render_template,
jsonify,
make_response,
)
from flask_httpauth import HTTPBasicAuth
from flask_apscheduler import APScheduler
from pydantic import ValidationError
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from logger import LoggerFactory
from cluster import Cluster
from cluster_state import AutoState
from custom_modules import reset_to_initial_state, create_custom_containers, stop_all_custom_containers
# --------------------------
# Проинициализируем константы
# --------------------------
ABS_PATH = os.path.dirname(os.path.realpath(__file__))
MAX_CONTENT_PATH = 1000000000
CONFIG = os.path.join(ABS_PATH, "config.ini")
SCHEDULER_API_ENABLED = True
SHOW_LOG = True
LoggerFactory.setting(
log_level=os.getenv("LOG_LEVEL", "INFO"),
log_format="[%(asctime)s] %(levelname)s (%(name)s - %(funcName)s): %(message)s",
show=True,
)
auth = HTTPBasicAuth()
scheduler = APScheduler()
app_logger = LoggerFactory.get_logger("APP")
AUTO = AutoState(debug=False)
def get_config() -> dict:
config = configparser.ConfigParser()
config.read(CONFIG)
auth_data = dict(config.items("API"))
return auth_data
USER = get_config()["user"]
PASSWORD = get_config()["password"]
users = {
USER: generate_password_hash(PASSWORD),
}
@auth.verify_password
def verify_password(username, password):
if username in users and check_password_hash(users.get(username), password):
return username
# --------------------------
# Создадим приложение
# --------------------------
app = Flask(__name__, template_folder=os.path.join(ABS_PATH, "templates"))
app.config["MAX_CONTENT_PATH"] = MAX_CONTENT_PATH
app.config["SCHEDULER_API_ENABLED"] = SCHEDULER_API_ENABLED
app.config["SESSION_TYPE"] = "filesystem"
# --------------------------
# Настроим хуки и ручки приложения
# --------------------------
@app.before_first_request
def startup():
# Приведем систему в начальное состояние
# TODO: запуск плейбука, который проверит кодовую базу
# желательно, чтобы он скопировал код и указал путь к корневой директории
# в переменной окружения ML_PATH
reset_to_initial_state()
app_logger.info("master-node запущена!")
@app.teardown_appcontext
def shutdown(exception=None):
# Остановим все контейнеры
stop_all_custom_containers()
app_logger.info("master-node остановлена!")
# curl -u <user>:<pass> -d "playbook=ping_workers&args=hosts=workers" -v http://localhost:5010/api/v1.0/run_ansible
@app.route("/api/v1.0/run_ansible", methods=["POST"])
@auth.login_required
def run_ansible():
try:
cluster = Cluster(CONFIG, ABS_PATH, use_key=False)
playbook = request.form.get("playbook")
playbook_args = request.form.get("args")
if playbook and playbook_args:
command = f"""ansible-playbook --extra-vars "{playbook_args}" playbooks/{playbook}.yml"""
state = cluster.run_ansible_command(command, use_configs=True)
else:
state = "No playbook and playbook args given"
time = datetime.datetime.now()
data = {
"message": state,
"code": "SUCCESS",
"insertTimestamp": time,
}
return make_response(jsonify(data), 200)
except Exception as e:
data = {"message": f"Run ansible failed with error: {e}", "code": "FAILED"}
app_logger.error(data)
return make_response(jsonify(data), 400)
@app.route('/api/v1.0/interact_with_custom_modules', methods=['POST'])
@auth.login_required
def interact_with_custom_modules():
# Получим данные запроса
data = request.get_json()
# TODO: настроить адрес бекенда
back_url = "https://api.statanly.com:8443"
try:
# TODO: получим токен авторизации
token = requests.post(
f"{back_url}/api/auth/login",
data = {"username": "admin@eatom.ru", "password": "admin"}
).json()['access_token']
# выполним необходимую операция с кастомными модулями
if data["request_type"] == "get_all_modules":
response = requests.get(
f"{back_url}/api/custom-modules",
headers = {"Authorization": f"Bearer {token}"}
)
elif data["request_type"] == "change_status":
response = requests.patch(
f"{back_url}/api/custom-modules/{data['module_id']}",
json = {"status": data["status"]}
)
data = {
"response": response.json(),
"message": "",
"code": "SUCCESS"
}
return make_response(jsonify(data), 200)
except Exception as e:
data = {
"response": "",
"message": f"Cannot interact with custom modules: {e}",
"code": "FAILED"
}
return make_response(jsonify(data), 400)
# --------------------------
# Методы, которыми управляет scheduler
# --------------------------
#@scheduler.task(
# "interval",
# id="cluster_state",
# seconds=30,
# misfire_grace_time=900
#)
#def cluster_state():
# AUTO.check_cluster_state()
# app_logger.debug("Finished with auto cluster state")
scheduler.task(
"interval",
id="cluster_state",
seconds=30,
misfire_grace_time=900
)(create_custom_containers)
if __name__ == "__main__":
port = int(os.environ.get("PORT", 5010))
app_logger.info(ABS_PATH)
app.secret_key = get_config()["key"]
mode = get_config()["mode"]
scheduler.init_app(app)
scheduler.start()
if mode == "debug":
app.run(debug=True, host="0.0.0.0", port=port)
elif mode == "prod":
app.run(port=port)