Source code for deepdataspace.services.service

"""
deepdataspace.services.service

Base class and common interfaces for all services.
"""

import contextlib
import os
import signal
import subprocess
import sys
import time
from threading import Thread

import psutil

from deepdataspace.services import config
from deepdataspace.utils.os import PLATFORM
from deepdataspace.utils.os import Platforms
from deepdataspace.utils.os import get_pid_by_cmd_id


[docs]def terminate_all_children(sig, frame): process = psutil.Process() for child in process.children(recursive=True): try: config.print_if_verbose(f"terminating process, pid={child.pid}, cmd_line={' '.join(child.cmdline())}") os.kill(child.pid, signal.SIGTERM) except: # don't block program terminating in any way pass exit(0)
signal.signal(signal.SIGTERM, terminate_all_children) signal.signal(signal.SIGINT, terminate_all_children) _progress_logs = []
[docs]@contextlib.contextmanager def progress_log(log_prefix: str): """ Print a dynamic log message while we start services in the background. So users won't mistake that our process is dead. """ _progress_logs.append(log_prefix) running = False def _threaded(): idx = 0 while running: if _progress_logs and idx % 10 == 0: _progress_logs[-1] += "." print(_progress_logs[-1], flush=True, end="\r") if config.VERBOSE_LOG: return idx += 1 time.sleep(0.1) try: running = True print(_progress_logs[-1], flush=True, end="\r") thread = Thread(target=_threaded) thread.setDaemon(True) thread.start() yield finally: running = False clean_log = " " * len(_progress_logs.pop()) print(clean_log, flush=True, end="\r")
[docs]class Service: def __init__(self, name: str, cmd_list: list): self.name = name self.cmd_list = cmd_list self.cmd_id = None self.pid = None self.started = False
[docs] @staticmethod def close_process(pid: int): """ Close a process by pid. Try p.terminate first, then p.kill if it survives more than 1 second. """ if not psutil.pid_exists(pid): return p = psutil.Process(pid) p.terminate() time.sleep(0.4) if p.is_running(): time.sleep(0.6) try: p.kill() except psutil.NoSuchProcess: pass
[docs] def clean_process(self, ): """ Close all process identified by cmd_id """ pids = get_pid_by_cmd_id(self.cmd_id) for pid in pids: config.print_if_verbose(f"{self.name} is already started, killing it now...") self.close_process(pid)
[docs] def open_process(self, cmd: list, wait: int = 2): """ Start a subprocess with cmd and argument. :param cmd: the cmd and arguments used to start the subprocess :param wait: num of seconds to wait after init the subprocess """ if config.VERBOSE_LOG is True: stdout = sys.stdout stderr = sys.stderr else: stdout = subprocess.DEVNULL stderr = subprocess.DEVNULL if PLATFORM == Platforms.Win: p = subprocess.Popen(cmd, subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP, stdout=stdout, stderr=stderr) else: p = subprocess.Popen(cmd, preexec_fn=os.setpgrp, stdout=stdout, stderr=stderr) self.pid = p.pid time.sleep(wait) # wait for a while, so if the process exit immediately, we have a chance to catch its status
[docs] def check_process(self): """ Check if the process identified by cmd_is is running. """ pids = get_pid_by_cmd_id(self.cmd_id) if pids: config.print_if_verbose(f"{self.name} is started") self.pid = pids[0] self.started = True else: config.print_if_verbose(f"failed to start {self.name}") self.pid = None self.started = False
[docs] def start_process(self, cmd_list: list, wait: int = 2): """ Start a subprocess with the command line arguments. :param cmd_list: the command line arguments to start the process :param cmd_id: the command line identifier, this must be unique enough to identify the process from all processes :param wait: how many seconds to wait after the subprocess is called """ config.print_if_verbose(f"trying to start {self.name} with command: {' '.join(cmd_list)}") self.clean_process() # close started subprocess first self.open_process(cmd_list, wait) # then open the process, ensure the process is started by us self.check_process() # double-check the process status