Source code for tsnkit.core._system

"""
Author: Chuanyu (skewcy@gmail.com)
_system.py (c) 2023
Desc: description
Created:  2023-10-08T06:14:11.998Z
"""

import asyncio
import multiprocessing
from multiprocessing.sharedctypes import SynchronizedBase, synchronized
import signal
import time

from ._constants import METHOD_TO_PROCNUM, T_LIMIT

import psutil
import os
import sys
import subprocess

if sys.platform != "win32" and sys.platform != "cygwin":
    import resource


[docs] def mem_log() -> float: ## Log memory usage in MB ## psutil returns in bytes, resource returns in KB. if sys.platform == "win32" or sys.platform == "cygwin": return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 else: return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
[docs] def time_log() -> float: if sys.platform == "win32" or sys.platform == "cygwin": return psutil.Process(os.getpid()).cpu_times().user else: return resource.getrusage(resource.RUSAGE_SELF).ru_utime
[docs] def is_timeout(thres: float) -> bool: return time_log() > thres
[docs] def init_output_folder(path: str) -> None: if os.path.isdir(path): pass else: os.mkdir(path) handle = os.open(path + "readme", os.O_CREAT) os.close(handle)
[docs] def oom_manager(name: str) -> subprocess.Popen: kill = open("../logs/" + name + "_kill.log", "w") kill_err = open("../logs/" + name + "_kill.err", "w") return subprocess.Popen( [ "bash", "./killif.sh", str(METHOD_TO_PROCNUM[name] * 1024 * 1024), str(os.getpid()), ], stdout=kill, stderr=kill_err, )
[docs] def find_files_with_prefix(directory: str, prefix: str): matching_files = [] for file in os.listdir(directory): full_path = os.path.join(directory, file) if ( os.path.isfile(full_path) and file.startswith(prefix) and ".csv" in full_path ): matching_files.append(full_path) return matching_files
[docs] async def kill_process(proc: psutil.Process, time_limit: float): while True: if proc.poll() is not None: # type: ignore break proc.kill(signal.SIGINT) # type: ignore await asyncio.sleep(time_limit)
[docs] def kill_if(main_proc: int, mem_limit: int, time_limit: int, sig: SynchronizedBase): """ Kill the process if it uses more than mem memory or more than time seconds Args: main_proc: the main process id mem_limit: the memory limit, uint: GB time_limit: the time limit, uint: seconds """ time.sleep(1) BREAK_TIME = 0.5 ## Check every 0.5 seconds WAIT_TIME = 60 ## Wait for 1 mins before next killing self_proc = os.getpid() mem_limit = mem_limit * 1024**3 pids_killed = set() pids_killed_time = {} # type: ignore while True: # if len(pids_killed) >= num_task.value * 1.1: # num_task.value = 0 # for pid in pids_killed - pids_hard_killed: # if psutil.pid_exists(pid) and ( # time.time() - pids_killed_time[pid] > HARD_KILL_TIME or # psutil.Process(pid).memory_info().rss > mem_limit * 1.5): # os.kill(pid, signal.SIGKILL) # pids_hard_killed.add(pid) # # num_task.value -= 1 # print(f'Hard kill {pid} at {time.strftime("%d~%H:%M:%S")}') # elif time.time() - pids_killed_time[pid] < WAIT_TIME: # temp.add(pid) # pids_killed = temp _keep_alive = False _current_time = time.time() ## kill the process if it uses more than mem memory or more than time seconds for proc in psutil.process_iter( ["pid", "name", "username", "ppid", "cpu_times", "status"] ): proc_info = proc.info # type: ignore if ( "python" not in proc_info["name"] and "cpoptimizer" not in proc_info["name"] ): continue if ( proc_info["ppid"] != main_proc and "cpoptimizer" not in proc_info["name"] ): # print('ppid: ', proc_info['ppid'], flush=True) continue if proc_info["pid"] == main_proc: continue if proc_info["pid"] == self_proc: continue # if proc_info['username'] != sys_user: # continue # print('CPU usage: ', # proc_info['cpu_percent'], # proc_info['pid'], # flush=True) if ( "python" in proc_info["name"] and proc_info["cpu_times"].user > 0 and proc_info["status"] != psutil.STATUS_ZOMBIE ): # print('PID, CPU TIME, STATUS: ', # proc_info['pid'], # proc_info['cpu_times'], # proc_info['status'], # flush=True) _keep_alive = True if ( proc_info["pid"] in pids_killed and _current_time - pids_killed_time[proc_info["pid"]] < WAIT_TIME ): continue try: mem = proc.memory_info().rss start_time = proc.create_time() elasp_time = _current_time - start_time if elasp_time > time_limit * 1.1 or mem > mem_limit: if ( proc_info["status"] == psutil.STATUS_ZOMBIE or elasp_time > time_limit * 1.2 or mem > mem_limit * 1.1 ): proc.send_signal(signal.SIGKILL) # kill_process(proc, WAIT_TIME) proc.send_signal(signal.SIGINT) # os.kill(proc_info['pid'], signal.SIGINT) pids_killed.add(proc_info["pid"]) pids_killed_time[proc_info["pid"]] = _current_time # print('Killed process: ', # proc_info['pid'], # mem, # elasp_time, # file=sys.stdout, # flush=True) # print('len of pids_killed: ', len(pids_killed)) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass except Exception as e: pass if not _keep_alive: sig.value -= 1 # type: ignore time.sleep(BREAK_TIME)