Skip to content

process_measurement

mlte/measurement/process_measurement.py

Base class for measurement of external processes asynchronously.

ProcessMeasurement

Bases: Measurement

Base class to be extended to measure external processes.

Source code in mlte/measurement/process_measurement.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class ProcessMeasurement(Measurement):
    """Base class to be extended to measure external processes."""

    @staticmethod
    def start_script(script: str, arguments: List[str]) -> int:
        """
        Initialize an external Python process running training or similar script.

        :param script: The full path to a Python script with the training or equivalent process to run.
        :param arguments: A list of string arguments for the process.
        :return: the id of the process that was created.
        """
        return job.spawn_python_job(script, arguments)

    @staticmethod
    def start_process(process: str, arguments: List[str]) -> int:
        """
        Initialize an external process running training or similar.

        :param process: The full path to a process to run.
        :param arguments: A list of string arguments for the process.
        :return: the id of the process that was created.
        """
        return job.spawn_job(process, arguments)

    def __init__(self, instance: ProcessMeasurement, identifier: str):
        """
        Initialize a new ProcessMeasurement measurement.

        :param identifier: A unique identifier for the measurement
        """
        super().__init__(instance, identifier)
        self.thread: Optional[threading.Thread] = None
        self.stored_value: Optional[Value] = None
        self.error: str = ""

    def evaluate_async(self, pid: int, *args, **kwargs):
        """
        Monitor an external process at `pid` in a separate thread until it stops.
        Equivalent to evaluate(), but does not return the value immediately as it works in the background.

        :param pid: The process identifier
        """

        # Evaluate the measurement
        self.error = ""
        self.stored_value = None
        self.thread = threading.Thread(
            target=lambda: self._run_call(pid, *args, **kwargs)
        )
        self.thread.start()

    def _run_call(self, pid, *args, **kwargs):
        """
        Runs the internall __call__ method that should implement the measurement, and stores its results when it finishes.
        """
        try:
            self.stored_value = self.__call__(pid, *args, **kwargs)
        except Exception as e:
            self.error = f"Could not evaluate process: {e}"

    def wait_for_output(self, poll_interval: int = 1) -> Value:
        """
        Needed to get the output of a measurement executed in parallel using evaluate_async. Waits for the thread to finish.

        :param poll_interval: The poll interval in seconds
        :return: The resulting value of measurement execution, with semantics
        """
        # Wait for thread to finish, and return results once it is done.
        if self.thread is None:
            raise Exception(
                "Can't wait for value, no process is currently running."
            )
        while self.thread.is_alive():
            time.sleep(poll_interval)

        # If an exception was raised, return it here as an exception as well.
        if self.error != "":
            raise RuntimeError(self.error)

        if self.stored_value is None:
            raise Exception("No valid value was returned from measurement.")
        return self.stored_value

__init__(instance, identifier)

Initialize a new ProcessMeasurement measurement.

Parameters:

Name Type Description Default
identifier str

A unique identifier for the measurement

required
Source code in mlte/measurement/process_measurement.py
47
48
49
50
51
52
53
54
55
56
def __init__(self, instance: ProcessMeasurement, identifier: str):
    """
    Initialize a new ProcessMeasurement measurement.

    :param identifier: A unique identifier for the measurement
    """
    super().__init__(instance, identifier)
    self.thread: Optional[threading.Thread] = None
    self.stored_value: Optional[Value] = None
    self.error: str = ""

evaluate_async(pid, *args, **kwargs)

Monitor an external process at pid in a separate thread until it stops. Equivalent to evaluate(), but does not return the value immediately as it works in the background.

Parameters:

Name Type Description Default
pid int

The process identifier

required
Source code in mlte/measurement/process_measurement.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def evaluate_async(self, pid: int, *args, **kwargs):
    """
    Monitor an external process at `pid` in a separate thread until it stops.
    Equivalent to evaluate(), but does not return the value immediately as it works in the background.

    :param pid: The process identifier
    """

    # Evaluate the measurement
    self.error = ""
    self.stored_value = None
    self.thread = threading.Thread(
        target=lambda: self._run_call(pid, *args, **kwargs)
    )
    self.thread.start()

start_process(process, arguments) staticmethod

Initialize an external process running training or similar.

Parameters:

Name Type Description Default
process str

The full path to a process to run.

required
arguments List[str]

A list of string arguments for the process.

required

Returns:

Type Description
int

the id of the process that was created.

Source code in mlte/measurement/process_measurement.py
36
37
38
39
40
41
42
43
44
45
@staticmethod
def start_process(process: str, arguments: List[str]) -> int:
    """
    Initialize an external process running training or similar.

    :param process: The full path to a process to run.
    :param arguments: A list of string arguments for the process.
    :return: the id of the process that was created.
    """
    return job.spawn_job(process, arguments)

start_script(script, arguments) staticmethod

Initialize an external Python process running training or similar script.

Parameters:

Name Type Description Default
script str

The full path to a Python script with the training or equivalent process to run.

required
arguments List[str]

A list of string arguments for the process.

required

Returns:

Type Description
int

the id of the process that was created.

Source code in mlte/measurement/process_measurement.py
25
26
27
28
29
30
31
32
33
34
@staticmethod
def start_script(script: str, arguments: List[str]) -> int:
    """
    Initialize an external Python process running training or similar script.

    :param script: The full path to a Python script with the training or equivalent process to run.
    :param arguments: A list of string arguments for the process.
    :return: the id of the process that was created.
    """
    return job.spawn_python_job(script, arguments)

wait_for_output(poll_interval=1)

Needed to get the output of a measurement executed in parallel using evaluate_async. Waits for the thread to finish.

Parameters:

Name Type Description Default
poll_interval int

The poll interval in seconds

1

Returns:

Type Description
Value

The resulting value of measurement execution, with semantics

Source code in mlte/measurement/process_measurement.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def wait_for_output(self, poll_interval: int = 1) -> Value:
    """
    Needed to get the output of a measurement executed in parallel using evaluate_async. Waits for the thread to finish.

    :param poll_interval: The poll interval in seconds
    :return: The resulting value of measurement execution, with semantics
    """
    # Wait for thread to finish, and return results once it is done.
    if self.thread is None:
        raise Exception(
            "Can't wait for value, no process is currently running."
        )
    while self.thread.is_alive():
        time.sleep(poll_interval)

    # If an exception was raised, return it here as an exception as well.
    if self.error != "":
        raise RuntimeError(self.error)

    if self.stored_value is None:
        raise Exception("No valid value was returned from measurement.")
    return self.stored_value