Projects
Eulaceura:Factory
sysSentry
_service:obs_scm:add-collect-module-to-sysSentr...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:add-collect-module-to-sysSentry.patch of Package sysSentry
From bd32dc01000126d593c188d47404cfdbe1df343e Mon Sep 17 00:00:00 2001 From: zhuofeng <zhuofeng2@huawei.com> Date: Thu, 12 Sep 2024 11:29:01 +0800 Subject: [PATCH 1/2] add collect module to sysSentry --- config/collector.conf | 7 + service/sentryCollector.service | 12 + service/sysSentry.service | 2 +- src/python/sentryCollector/__init__.py | 0 src/python/sentryCollector/__main__.py | 17 ++ src/python/sentryCollector/collect_config.py | 118 ++++++++ src/python/sentryCollector/collect_io.py | 239 ++++++++++++++++ src/python/sentryCollector/collect_plugin.py | 276 ++++++++++++++++++ src/python/sentryCollector/collect_server.py | 285 +++++++++++++++++++ src/python/sentryCollector/collectd.py | 99 +++++++ src/python/setup.py | 4 +- 11 files changed, 1057 insertions(+), 2 deletions(-) create mode 100644 config/collector.conf create mode 100644 service/sentryCollector.service create mode 100644 src/python/sentryCollector/__init__.py create mode 100644 src/python/sentryCollector/__main__.py create mode 100644 src/python/sentryCollector/collect_config.py create mode 100644 src/python/sentryCollector/collect_io.py create mode 100644 src/python/sentryCollector/collect_plugin.py create mode 100644 src/python/sentryCollector/collect_server.py create mode 100644 src/python/sentryCollector/collectd.py diff --git a/config/collector.conf b/config/collector.conf new file mode 100644 index 0000000..9baa086 --- /dev/null +++ b/config/collector.conf @@ -0,0 +1,7 @@ +[common] +modules=io + +[io] +period_time=1 +max_save=10 +disk=default \ No newline at end of file diff --git a/service/sentryCollector.service b/service/sentryCollector.service new file mode 100644 index 0000000..4ee07d5 --- /dev/null +++ b/service/sentryCollector.service @@ -0,0 +1,12 @@ +[Unit] +Description = Collection module added for sysSentry and kernel lock-free collection + +[Service] +ExecStart=/usr/bin/python3 /usr/bin/sentryCollector +ExecStop=/bin/kill $MAINPID +KillMode=process +Restart=on-failure +RestartSec=10s + +[Install] +WantedBy = multi-user.target diff --git a/service/sysSentry.service b/service/sysSentry.service index 4d85a6c..1d8338f 100644 --- a/service/sysSentry.service +++ b/service/sysSentry.service @@ -2,7 +2,7 @@ Description=EulerOS System Inspection Frame [Service] -ExecStart=/usr/bin/syssentry +ExecStart=/usr/bin/python3 /usr/bin/syssentry ExecStop=/bin/kill $MAINPID KillMode=process Restart=on-failure diff --git a/src/python/sentryCollector/__init__.py b/src/python/sentryCollector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/python/sentryCollector/__main__.py b/src/python/sentryCollector/__main__.py new file mode 100644 index 0000000..9c2ae50 --- /dev/null +++ b/src/python/sentryCollector/__main__.py @@ -0,0 +1,17 @@ +# coding: utf-8 +# Copyright (c) 2023 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +main +""" +from collectd import collectd + +collectd.main() diff --git a/src/python/sentryCollector/collect_config.py b/src/python/sentryCollector/collect_config.py new file mode 100644 index 0000000..b6cc75c --- /dev/null +++ b/src/python/sentryCollector/collect_config.py @@ -0,0 +1,118 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +Read and save collector.conf value. +""" +import configparser +import logging +import os +import re + + +COLLECT_CONF_PATH = "/etc/sysSentry/collector.conf" + +CONF_COMMON = 'common' +CONF_MODULES = 'modules' + +# io +CONF_IO = 'io' +CONF_IO_PERIOD_TIME = 'period_time' +CONF_IO_MAX_SAVE = 'max_save' +CONF_IO_DISK = 'disk' +CONF_IO_PERIOD_TIME_DEFAULT = 1 +CONF_IO_MAX_SAVE_DEFAULT = 10 +CONF_IO_DISK_DEFAULT = "default" + +class CollectConfig: + def __init__(self, filename=COLLECT_CONF_PATH): + + self.filename = filename + self.modules = [] + self.module_count = 0 + self.load_config() + + def load_config(self): + if not os.path.exists(self.filename): + logging.error("%s is not exists", self.filename) + return + + try: + self.config = configparser.ConfigParser() + self.config.read(self.filename) + except configparser.Error: + logging.error("collectd configure file read failed") + return + + try: + common_config = self.config[CONF_COMMON] + modules_str = common_config[CONF_MODULES] + # remove space + modules_list = modules_str.replace(" ", "").split(',') + except KeyError as e: + logging.error("read config data failed, %s", e) + return + + pattern = r'^[a-zA-Z0-9-_]+$' + for module_name in modules_list: + if not re.match(pattern, module_name): + logging.warning("module_name: %s is invalid", module_name) + continue + if not self.config.has_section(module_name): + logging.warning("module_name: %s config is incorrect", module_name) + continue + self.modules.append(module_name) + + def load_module_config(self, module_name): + module_name = module_name.strip().lower() + if module_name in self.modules and self.config.has_section(module_name): + return {key.lower(): value for key, value in self.config[module_name].items()} + else: + raise ValueError(f"Module '{module_name}' not found in configuration") + + def get_io_config(self): + result_io_config = {} + io_map_value = self.load_module_config(CONF_IO) + # period_time + period_time = io_map_value.get(CONF_IO_PERIOD_TIME) + if period_time and period_time.isdigit() and int(period_time) >= 1 and int(period_time) <= 300: + result_io_config[CONF_IO_PERIOD_TIME] = int(period_time) + else: + logging.warning("module_name = %s section, field = %s is incorrect, use default %d", + CONF_IO, CONF_IO_PERIOD_TIME, CONF_IO_PERIOD_TIME_DEFAULT) + result_io_config[CONF_IO_PERIOD_TIME] = CONF_IO_PERIOD_TIME_DEFAULT + # max_save + max_save = io_map_value.get(CONF_IO_MAX_SAVE) + if max_save and max_save.isdigit() and int(max_save) >= 1 and int(max_save) <= 300: + result_io_config[CONF_IO_MAX_SAVE] = int(max_save) + else: + logging.warning("module_name = %s section, field = %s is incorrect, use default %d", + CONF_IO, CONF_IO_MAX_SAVE, CONF_IO_MAX_SAVE_DEFAULT) + result_io_config[CONF_IO_MAX_SAVE] = CONF_IO_MAX_SAVE_DEFAULT + # disk + disk = io_map_value.get(CONF_IO_DISK) + if disk: + disk_str = disk.replace(" ", "") + pattern = r'^[a-zA-Z0-9-_,]+$' + if not re.match(pattern, disk_str): + logging.warning("module_name = %s section, field = %s is incorrect, use default %s", + CONF_IO, CONF_IO_DISK, CONF_IO_DISK_DEFAULT) + disk_str = CONF_IO_DISK_DEFAULT + result_io_config[CONF_IO_DISK] = disk_str + else: + logging.warning("module_name = %s section, field = %s is incorrect, use default %s", + CONF_IO, CONF_IO_DISK, CONF_IO_DISK_DEFAULT) + result_io_config[CONF_IO_DISK] = CONF_IO_DISK_DEFAULT + logging.info("config get_io_config: %s", result_io_config) + return result_io_config + + def get_common_config(self): + return {key.lower(): value for key, value in self.config['common'].items()} diff --git a/src/python/sentryCollector/collect_io.py b/src/python/sentryCollector/collect_io.py new file mode 100644 index 0000000..b826dc4 --- /dev/null +++ b/src/python/sentryCollector/collect_io.py @@ -0,0 +1,239 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +collect module +""" +import os +import time +import logging +import threading + +from .collect_config import CollectConfig + +Io_Category = ["read", "write", "flush", "discard"] +IO_GLOBAL_DATA = {} +IO_CONFIG_DATA = [] + +class IoStatus(): + TOTAL = 0 + FINISH = 1 + LATENCY = 2 + +class CollectIo(): + + def __init__(self, module_config): + + io_config = module_config.get_io_config() + + self.period_time = io_config['period_time'] + self.max_save = io_config['max_save'] + disk_str = io_config['disk'] + + self.disk_map_stage = {} + self.window_value = {} + + self.loop_all = False + + if disk_str == "default": + self.loop_all = True + else: + self.disk_list = disk_str.strip().split(',') + + self.stop_event = threading.Event() + + IO_CONFIG_DATA.append(self.period_time) + IO_CONFIG_DATA.append(self.max_save) + + def get_blk_io_hierarchy(self, disk_name, stage_list): + stats_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/stats'.format(disk_name) + try: + with open(stats_file, 'r') as file: + lines = file.read() + except FileNotFoundError: + logging.error("The file %s does not exist", stats_file) + return -1 + except Exception as e: + logging.error("An error occurred3: %s", e) + return -1 + + curr_value = lines.strip().split('\n') + + for stage_val in curr_value: + stage = stage_val.split(' ')[0] + if (len(self.window_value[disk_name][stage])) >= 2: + self.window_value[disk_name][stage].pop(0) + + curr_stage_value = stage_val.split(' ')[1:-1] + self.window_value[disk_name][stage].append(curr_stage_value) + return 0 + + def append_period_lat(self, disk_name, stage_list): + for stage in stage_list: + if len(self.window_value[disk_name][stage]) < 2: + return + curr_stage_value = self.window_value[disk_name][stage][-1] + last_stage_value = self.window_value[disk_name][stage][-2] + + for index in range(len(Io_Category)): + # read=0, write=1, flush=2, discard=3 + if (len(IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]])) >= self.max_save: + IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].pop() + + curr_lat = self.get_latency_value(curr_stage_value, last_stage_value, index) + curr_iops = self.get_iops(curr_stage_value, last_stage_value, index) + curr_io_length = self.get_io_length(curr_stage_value, last_stage_value, index) + curr_io_dump = self.get_io_dump(disk_name, stage, index) + + IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops]) + + def get_iops(self, curr_stage_value, last_stage_value, category): + try: + finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH]) + except ValueError as e: + logging.error("get_iops convert to int failed, %s", e) + return 0 + value = finish / self.period_time + if value.is_integer(): + return int(value) + else: + return round(value, 1) + + def get_latency_value(self, curr_stage_value, last_stage_value, category): + try: + finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH]) + lat_time = (int(curr_stage_value[category * 3 + IoStatus.LATENCY]) - int(last_stage_value[category * 3 + IoStatus.LATENCY])) + except ValueError as e: + logging.error("get_latency_value convert to int failed, %s", e) + return 0 + if finish <= 0 or lat_time <= 0: + return 0 + value = lat_time / finish / 1000 / 1000 + if value.is_integer(): + return int(value) + else: + return round(value, 1) + + def get_io_length(self, curr_stage_value, last_stage_value, category): + try: + finish = int(curr_stage_value[category * 3 + IoStatus.FINISH]) - int(last_stage_value[category * 3 + IoStatus.FINISH]) + except ValueError as e: + logging.error("get_io_length convert to int failed, %s", e) + return 0 + value = finish / self.period_time / 1000 / 1000 + if value.is_integer(): + return int(value) + else: + return round(value, 1) + + def get_io_dump(self, disk_name, stage, category): + io_dump_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/{}/io_dump'.format(disk_name, stage) + count = 0 + try: + with open(io_dump_file, 'r') as file: + for line in file: + count += line.count('.op=' + Io_Category[category]) + except FileNotFoundError: + logging.error("The file %s does not exist.", io_dump_file) + return count + except Exception as e: + logging.error("An error occurred1: %s", e) + return count + return count + + def extract_first_column(self, file_path): + column_names = [] + try: + with open(file_path, 'r') as file: + for line in file: + parts = line.strip().split() + if parts: + column_names.append(parts[0]) + except FileNotFoundError: + logging.error("The file %s does not exist.", file_path) + except Exception as e: + logging.error("An error occurred2: %s", e) + return column_names + + def task_loop(self): + if self.stop_event.is_set(): + logging.info("collect io thread exit") + return + + for disk_name, stage_list in self.disk_map_stage.items(): + if self.get_blk_io_hierarchy(disk_name, stage_list) < 0: + continue + self.append_period_lat(disk_name, stage_list) + + threading.Timer(self.period_time, self.task_loop).start() + + def main_loop(self): + logging.info("collect io thread start") + base_path = '/sys/kernel/debug/block' + for disk_name in os.listdir(base_path): + if not self.loop_all and disk_name not in self.disk_list: + continue + + disk_path = os.path.join(base_path, disk_name) + blk_io_hierarchy_path = os.path.join(disk_path, 'blk_io_hierarchy') + + if not os.path.exists(blk_io_hierarchy_path): + logging.error("no blk_io_hierarchy directory found in %s, skipping.", disk_name) + continue + + for file_name in os.listdir(blk_io_hierarchy_path): + file_path = os.path.join(blk_io_hierarchy_path, file_name) + + if file_name == 'stats': + stage_list = self.extract_first_column(file_path) + self.disk_map_stage[disk_name] = stage_list + self.window_value[disk_name] = {} + IO_GLOBAL_DATA[disk_name] = {} + + if len(self.disk_map_stage) == 0: + logging.warning("no disks meet the requirements. the thread exits") + return + + for disk_name, stage_list in self.disk_map_stage.items(): + for stage in stage_list: + self.window_value[disk_name][stage] = [] + IO_GLOBAL_DATA[disk_name][stage] = {} + for category in Io_Category: + IO_GLOBAL_DATA[disk_name][stage][category] = [] + + while True: + start_time = time.time() + + if self.stop_event.is_set(): + logging.info("collect io thread exit") + return + + for disk_name, stage_list in self.disk_map_stage.items(): + if self.get_blk_io_hierarchy(disk_name, stage_list) < 0: + continue + self.append_period_lat(disk_name, stage_list) + + elapsed_time = time.time() - start_time + sleep_time = self.period_time - elapsed_time + if sleep_time < 0: + continue + while sleep_time > 1: + if self.stop_event.is_set(): + logging.info("collect io thread exit") + return + time.sleep(1) + sleep_time -= 1 + time.sleep(sleep_time) + + # set stop event, notify thread exit + def stop_thread(self): + logging.info("collect io thread is preparing to exit") + self.stop_event.set() diff --git a/src/python/sentryCollector/collect_plugin.py b/src/python/sentryCollector/collect_plugin.py new file mode 100644 index 0000000..49ce0a8 --- /dev/null +++ b/src/python/sentryCollector/collect_plugin.py @@ -0,0 +1,276 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +collcet plugin +""" +import json +import socket +import logging +import re + +COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock" + +# data length param +CLT_MSG_HEAD_LEN = 9 #3+2+4 +CLT_MSG_PRO_LEN = 2 +CLT_MSG_MAGIC_LEN = 3 +CLT_MSG_LEN_LEN = 4 + +CLT_MAGIC = "CLT" +RES_MAGIC = "RES" + +# disk limit +LIMIT_DISK_CHAR_LEN = 32 +LIMIT_DISK_LIST_LEN = 10 + +# stage limit +LIMIT_STAGE_CHAR_LEN = 20 +LIMIT_STAGE_LIST_LEN = 15 + +#iotype limit +LIMIT_IOTYPE_CHAR_LEN = 7 +LIMIT_IOTYPE_LIST_LEN = 4 + +#period limit +LIMIT_PERIOD_MIN_LEN = 1 +LIMIT_PERIOD_MAX_LEN = 300 + +# interface protocol +class ClientProtocol(): + IS_IOCOLLECT_VALID = 0 + GET_IO_DATA = 1 + PRO_END = 3 + +class ResultMessage(): + RESULT_SUCCEED = 0 + RESULT_UNKNOWN = 1 # unknown error + RESULT_NOT_PARAM = 2 # the parameter does not exist or the type does not match. + RESULT_INVALID_LENGTH = 3 # invalid parameter length. + RESULT_EXCEED_LIMIT = 4 # the parameter length exceeds the limit. + RESULT_PARSE_FAILED = 5 # parse failed + RESULT_INVALID_CHAR = 6 # invalid char + +Result_Messages = { + ResultMessage.RESULT_SUCCEED: "Succeed", + ResultMessage.RESULT_UNKNOWN: "Unknown error", + ResultMessage.RESULT_NOT_PARAM: "The parameter does not exist or the type does not match", + ResultMessage.RESULT_INVALID_LENGTH: "Invalid parameter length", + ResultMessage.RESULT_EXCEED_LIMIT: "The parameter length exceeds the limit", + ResultMessage.RESULT_PARSE_FAILED: "Parse failed", + ResultMessage.RESULT_INVALID_CHAR: "Invalid char" +} + + +def client_send_and_recv(request_data, data_str_len, protocol): + """client socket send and recv message""" + try: + client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + except socket.error: + print("collect_plugin: client creat socket error") + return None + + try: + client_socket.connect(COLLECT_SOCKET_PATH) + except OSError: + client_socket.close() + print("collect_plugin: client connect error") + return None + + req_data_len = len(request_data) + request_msg = CLT_MAGIC + str(protocol).zfill(CLT_MSG_PRO_LEN) + str(req_data_len).zfill(CLT_MSG_LEN_LEN) + request_data + + try: + client_socket.send(request_msg.encode()) + res_data = client_socket.recv(len(RES_MAGIC) + CLT_MSG_PRO_LEN + data_str_len) + res_data = res_data.decode() + except (OSError, UnicodeError): + client_socket.close() + print("collect_plugin: client communicate error") + return None + + res_magic = res_data[:CLT_MSG_MAGIC_LEN] + if res_magic != "RES": + print("res msg format error") + return None + + protocol_str = res_data[CLT_MSG_MAGIC_LEN:CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN] + try: + protocol_id = int(protocol_str) + except ValueError: + print("recv msg protocol id is invalid %s", protocol_str) + return None + + if protocol_id >= ClientProtocol.PRO_END: + print("protocol id is invalid") + return None + + try: + res_data_len = int(res_data[CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN:]) + res_msg_data = client_socket.recv(res_data_len) + res_msg_data = res_msg_data.decode() + return res_msg_data + except (OSError, ValueError, UnicodeError): + print("collect_plugin: client recv res msg error") + finally: + client_socket.close() + + return None + +def validate_parameters(param, len_limit, char_limit): + ret = ResultMessage.RESULT_SUCCEED + if not param: + print("parm is invalid") + ret = ResultMessage.RESULT_NOT_PARAM + return [False, ret] + + if not isinstance(param, list): + print(f"{param} is not list type.") + ret = ResultMessage.RESULT_NOT_PARAM + return [False, ret] + + if len(param) <= 0: + print(f"{param} length is 0.") + ret = ResultMessage.RESULT_INVALID_LENGTH + return [False, ret] + + if len(param) > len_limit: + print(f"{param} length more than {len_limit}") + ret = ResultMessage.RESULT_EXCEED_LIMIT + return [False, ret] + + pattern = r'^[a-zA-Z0-9_-]+$' + for info in param: + if len(info) > char_limit: + print(f"{info} length more than {char_limit}") + ret = ResultMessage.RESULT_EXCEED_LIMIT + return [False, ret] + if not re.match(pattern, info): + print(f"{info} is invalid char") + ret = ResultMessage.RESULT_INVALID_CHAR + return [False, ret] + + return [True, ret] + +def is_iocollect_valid(period, disk_list=None, stage=None): + result = inter_is_iocollect_valid(period, disk_list, stage) + error_code = result['ret'] + if error_code != ResultMessage.RESULT_SUCCEED: + result['message'] = Result_Messages[error_code] + return result + +def inter_is_iocollect_valid(period, disk_list=None, stage=None): + result = {} + result['ret'] = ResultMessage.RESULT_UNKNOWN + result['message'] = "" + + if not period or not isinstance(period, int): + result['ret'] = ResultMessage.RESULT_NOT_PARAM + return result + if period < LIMIT_PERIOD_MIN_LEN or period > LIMIT_PERIOD_MAX_LEN: + result['ret'] = ResultMessage.RESULT_INVALID_LENGTH + return result + + if not disk_list: + disk_list = [] + else: + res = validate_parameters(disk_list, LIMIT_DISK_LIST_LEN, LIMIT_DISK_CHAR_LEN) + if not res[0]: + result['ret'] = res[1] + return result + + if not stage: + stage = [] + else: + res = validate_parameters(stage, LIMIT_STAGE_LIST_LEN, LIMIT_STAGE_CHAR_LEN) + if not res[0]: + result['ret'] = res[1] + return result + + req_msg_struct = { + 'disk_list': json.dumps(disk_list), + 'period': period, + 'stage': json.dumps(stage) + } + request_message = json.dumps(req_msg_struct) + result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.IS_IOCOLLECT_VALID) + if not result_message: + print("collect_plugin: client_send_and_recv failed") + return result + + try: + json.loads(result_message) + except json.JSONDecodeError: + print("is_iocollect_valid: json decode error") + result['ret'] = ResultMessage.RESULT_PARSE_FAILED + return result + + result['ret'] = ResultMessage.RESULT_SUCCEED + result['message'] = result_message + return result + +def get_io_data(period, disk_list, stage, iotype): + result = inter_get_io_data(period, disk_list, stage, iotype) + error_code = result['ret'] + if error_code != ResultMessage.RESULT_SUCCEED: + result['message'] = Result_Messages[error_code] + return result + +def inter_get_io_data(period, disk_list, stage, iotype): + result = {} + result['ret'] = ResultMessage.RESULT_UNKNOWN + result['message'] = "" + + if not isinstance(period, int): + result['ret'] = ResultMessage.RESULT_NOT_PARAM + return result + if period < LIMIT_PERIOD_MIN_LEN or period > LIMIT_PERIOD_MAX_LEN: + result['ret'] = ResultMessage.RESULT_INVALID_LENGTH + return result + + res = validate_parameters(disk_list, LIMIT_DISK_LIST_LEN, LIMIT_DISK_CHAR_LEN) + if not res[0]: + result['ret'] = res[1] + return result + + res = validate_parameters(stage, LIMIT_STAGE_LIST_LEN, LIMIT_STAGE_CHAR_LEN) + if not res[0]: + result['ret'] = res[1] + return result + + res = validate_parameters(iotype, LIMIT_IOTYPE_LIST_LEN, LIMIT_IOTYPE_CHAR_LEN) + if not res[0]: + result['ret'] = res[1] + return result + + req_msg_struct = { + 'disk_list': json.dumps(disk_list), + 'period': period, + 'stage': json.dumps(stage), + 'iotype' : json.dumps(iotype) + } + + request_message = json.dumps(req_msg_struct) + result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.GET_IO_DATA) + if not result_message: + print("collect_plugin: client_send_and_recv failed") + return result + try: + json.loads(result_message) + except json.JSONDecodeError: + print("get_io_data: json decode error") + result['ret'] = ResultMessage.RESULT_PARSE_FAILED + return result + + result['ret'] = ResultMessage.RESULT_SUCCEED + result['message'] = result_message + return result + diff --git a/src/python/sentryCollector/collect_server.py b/src/python/sentryCollector/collect_server.py new file mode 100644 index 0000000..fa49781 --- /dev/null +++ b/src/python/sentryCollector/collect_server.py @@ -0,0 +1,285 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +listen module +""" +import sys +import signal +import traceback +import socket +import os +import json +import logging +import fcntl +import select +import threading +import time + +from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA +from .collect_config import CollectConfig + +SENTRY_RUN_DIR = "/var/run/sysSentry" +COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock" + +# socket param +CLT_LISTEN_QUEUE_LEN = 5 +SERVER_EPOLL_TIMEOUT = 0.3 + +# data length param +CLT_MSG_HEAD_LEN = 9 #3+2+4 +CLT_MSG_PRO_LEN = 2 +CLT_MSG_MAGIC_LEN = 3 +CLT_MSG_LEN_LEN = 4 + +# data flag param +CLT_MAGIC = "CLT" +RES_MAGIC = "RES" + +# interface protocol +class ServerProtocol(): + IS_IOCOLLECT_VALID = 0 + GET_IO_DATA = 1 + PRO_END = 3 + +class CollectServer(): + + def __init__(self): + + self.io_global_data = {} + + self.stop_event = threading.Event() + + def is_iocollect_valid(self, data_struct): + + result_rev = {} + self.io_global_data = IO_GLOBAL_DATA + + if len(IO_CONFIG_DATA) == 0: + logging.error("the collect thread is not started, the data is invalid. ") + return json.dumps(result_rev) + + period_time = IO_CONFIG_DATA[0] + max_save = IO_CONFIG_DATA[1] + + disk_list = json.loads(data_struct['disk_list']) + period = int(data_struct['period']) + stage_list = json.loads(data_struct['stage']) + + if (period < period_time) or (period > period_time * max_save) or (period % period_time): + logging.error("is_iocollect_valid: period time: %d is invalid", period) + return json.dumps(result_rev) + + for disk_name, stage_info in self.io_global_data.items(): + if len(disk_list) > 0 and disk_name not in disk_list: + continue + result_rev[disk_name] = [] + if len(stage_list) == 0: + result_rev[disk_name] = list(stage_info.keys()) + continue + for stage_name, stage_data in stage_info.items(): + if stage_name in stage_list: + result_rev[disk_name].append(stage_name) + + return json.dumps(result_rev) + + def get_io_data(self, data_struct): + result_rev = {} + self.io_global_data = IO_GLOBAL_DATA + + if len(IO_CONFIG_DATA) == 0: + logging.error("the collect thread is not started, the data is invalid. ") + return json.dumps(result_rev) + period_time = IO_CONFIG_DATA[0] + max_save = IO_CONFIG_DATA[1] + + period = int(data_struct['period']) + disk_list = json.loads(data_struct['disk_list']) + stage_list = json.loads(data_struct['stage']) + iotype_list = json.loads(data_struct['iotype']) + + if (period < period_time) or (period > period_time * max_save) or (period % period_time): + logging.error("get_io_data: period time: %d is invalid", period) + return json.dumps(result_rev) + + collect_index = period // period_time - 1 + logging.debug("period: %d, collect_index: %d", period, collect_index) + + for disk_name, stage_info in self.io_global_data.items(): + if disk_name not in disk_list: + continue + result_rev[disk_name] = {} + for stage_name, iotype_info in stage_info.items(): + if len(stage_list) > 0 and stage_name not in stage_list: + continue + result_rev[disk_name][stage_name] = {} + for iotype_name, iotype_info in iotype_info.items(): + if iotype_name not in iotype_list: + continue + if len(iotype_info) < collect_index: + continue + result_rev[disk_name][stage_name][iotype_name] = iotype_info[collect_index] + + return json.dumps(result_rev) + + def msg_data_process(self, msg_data, protocal_id): + """message data process""" + logging.debug("msg_data %s", msg_data) + protocol_name = msg_data[0] + try: + data_struct = json.loads(msg_data) + except json.JSONDecodeError: + logging.error("msg data process: json decode error") + return "Request message decode failed" + + if protocal_id == ServerProtocol.IS_IOCOLLECT_VALID: + res_msg = self.is_iocollect_valid(data_struct) + elif protocal_id == ServerProtocol.GET_IO_DATA: + res_msg = self.get_io_data(data_struct) + + return res_msg + + def msg_head_process(self, msg_head): + """message head process""" + ctl_magic = msg_head[:CLT_MSG_MAGIC_LEN] + if ctl_magic != CLT_MAGIC: + logging.error("recv msg head magic invalid") + return None + + protocol_str = msg_head[CLT_MSG_MAGIC_LEN:CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN] + try: + protocol_id = int(protocol_str) + except ValueError: + logging.error("recv msg protocol id is invalid") + return None + + data_len_str = msg_head[CLT_MSG_MAGIC_LEN+CLT_MSG_PRO_LEN:CLT_MSG_HEAD_LEN] + try: + data_len = int(data_len_str) + except ValueError: + logging.error("recv msg data len is invalid %s", data_len_str) + return None + + return [protocol_id, data_len] + + def server_recv(self, server_socket: socket.socket): + """server receive""" + try: + client_socket, _ = server_socket.accept() + logging.debug("server_fd listen ok") + except socket.error: + logging.error("server accept failed, %s", socket.error) + return + + try: + msg_head = client_socket.recv(CLT_MSG_HEAD_LEN) + logging.debug("recv msg head: %s", msg_head.decode()) + head_info = self.msg_head_process(msg_head.decode()) + except (OSError, UnicodeError): + client_socket.close() + logging.error("server recv HEAD failed") + return + + protocol_id = head_info[0] + data_len = head_info[1] + logging.debug("msg protocol id: %d, data length: %d", protocol_id, data_len) + if protocol_id >= ServerProtocol.PRO_END: + client_socket.close() + logging.error("protocol id is invalid") + return + + if data_len < 0: + client_socket.close() + logging.error("msg head parse failed") + return + + try: + msg_data = client_socket.recv(data_len) + msg_data_decode = msg_data.decode() + logging.debug("msg data %s", msg_data_decode) + except (OSError, UnicodeError): + client_socket.close() + logging.error("server recv MSG failed") + return + + res_data = self.msg_data_process(msg_data_decode, protocol_id) + logging.debug("res data %s", res_data) + + # server send + res_head = RES_MAGIC + res_head += str(protocol_id).zfill(CLT_MSG_PRO_LEN) + res_data_len = str(len(res_data)).zfill(CLT_MSG_LEN_LEN) + res_head += res_data_len + logging.debug("res head %s", res_head) + + res_msg = res_head + res_data + logging.debug("res msg %s", res_msg) + + try: + client_socket.send(res_msg.encode()) + except OSError: + logging.error("server recv failed") + finally: + client_socket.close() + return + + def server_fd_create(self): + """create server fd""" + if not os.path.exists(SENTRY_RUN_DIR): + logging.error("%s not exist, failed", SENTRY_RUN_DIR) + return None + + try: + server_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server_fd.setblocking(False) + if os.path.exists(COLLECT_SOCKET_PATH): + os.remove(COLLECT_SOCKET_PATH) + + server_fd.bind(COLLECT_SOCKET_PATH) + os.chmod(COLLECT_SOCKET_PATH, 0o600) + server_fd.listen(CLT_LISTEN_QUEUE_LEN) + logging.debug("%s bind and listen", COLLECT_SOCKET_PATH) + except socket.error: + logging.error("server fd create failed") + server_fd = None + + return server_fd + + + def server_loop(self): + """main loop""" + logging.info("collect server thread start") + server_fd = self.server_fd_create() + if not server_fd: + return + + epoll_fd = select.epoll() + epoll_fd.register(server_fd.fileno(), select.EPOLLIN) + + logging.debug("start server_loop loop") + while True: + if self.stop_event.is_set(): + logging.info("collect server thread exit") + server_fd = None + return + try: + events_list = epoll_fd.poll(SERVER_EPOLL_TIMEOUT) + for event_fd, _ in events_list: + if event_fd == server_fd.fileno(): + self.server_recv(server_fd) + else: + continue + except socket.error: + pass + + def stop_thread(self): + logging.info("collect server thread is preparing to exit") + self.stop_event.set() diff --git a/src/python/sentryCollector/collectd.py b/src/python/sentryCollector/collectd.py new file mode 100644 index 0000000..b77c642 --- /dev/null +++ b/src/python/sentryCollector/collectd.py @@ -0,0 +1,99 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +main loop for collect. +""" +import sys +import signal +import traceback +import socket +import os +import json +import logging +import fcntl +import select + +import threading + +from .collect_io import CollectIo +from .collect_server import CollectServer +from .collect_config import CollectConfig + +SENTRY_RUN_DIR = "/var/run/sysSentry" +COLLECT_SOCKET_PATH = "/var/run/sysSentry/collector.sock" +SENTRY_RUN_DIR_PERM = 0o750 + +COLLECT_LOG_FILE = "/var/log/sysSentry/collector.log" +Thread_List = [] +Module_Map_Class = {"io" : CollectIo} + +def remove_sock_file(): + try: + os.unlink(COLLECT_SOCKET_PATH) + except FileNotFoundError: + pass + +def sig_handler(signum, _f): + if signum not in (signal.SIGINT, signal.SIGTERM): + return + for i in range(len(Thread_List)): + Thread_List[i][0].stop_thread() + + remove_sock_file() + sys.exit(0) + +def main(): + """main + """ + if not os.path.exists(SENTRY_RUN_DIR): + os.mkdir(SENTRY_RUN_DIR) + os.chmod(SENTRY_RUN_DIR, mode=SENTRY_RUN_DIR_PERM) + + logging.basicConfig(filename=COLLECT_LOG_FILE, level=logging.INFO) + os.chmod(COLLECT_LOG_FILE, 0o600) + + try: + signal.signal(signal.SIGINT, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGHUP, sig_handler) + + logging.info("finish main parse_args") + + module_config = CollectConfig() + module_list = module_config.modules + + # listen thread + cs = CollectServer() + listen_thread = threading.Thread(target=cs.server_loop) + listen_thread.start() + Thread_List.append([cs, listen_thread]) + + # collect thread + for info in module_list: + class_name = Module_Map_Class.get(info) + if not class_name: + logging.info("%s correspond to class is not exists", info) + continue + cn = class_name(module_config) + collect_thread = threading.Thread(target=cn.main_loop) + collect_thread.start() + Thread_List.append([cn, collect_thread]) + + for i in range(len(Thread_List)): + Thread_List[i][1].join() + + except Exception: + logging.error('%s', traceback.format_exc()) + finally: + pass + + logging.info("All threads have finished. Main thread is exiting.") \ No newline at end of file diff --git a/src/python/setup.py b/src/python/setup.py index f96a96e..c28c691 100644 --- a/src/python/setup.py +++ b/src/python/setup.py @@ -31,7 +31,9 @@ setup( 'console_scripts': [ 'cpu_sentry=syssentry.cpu_sentry:main', 'syssentry=syssentry.syssentry:main', - 'xalarmd=xalarm.xalarm_daemon:alarm_process_create' + 'xalarmd=xalarm.xalarm_daemon:alarm_process_create', + 'sentryCollector=sentryCollector.collectd:main', + 'avg_block_io=sentryPlugins.avg_block_io.avg_block_io:main' ] }, ) -- 2.33.0
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2