Skip to content

process_measurement

Base class for measurement of external processes asynchronously.

ProcessMeasurement

Bases: Measurement

Base class to be extended to measure external processes.

Source code in mlte/measurement/process_measurement.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class ProcessMeasurement(Measurement):
    """Base class to be extended to measure external processes."""

    PROCESS_GROUP_KEY = "group"
    """Key to store optional process groups used by this measurement."""

    @staticmethod
    def start_script(command: list[str]) -> int:
        """
        Initialize an external Python process running training or similar script.

        :param command: A list with the full path to a Python script with the training or
                        equivalent process to run, and a list of string arguments for the process.
        :return: the id of the process that was created.
        """
        return job.spawn_python_job(command[0], command[1:])

    @staticmethod
    def start_process(command: list[str]) -> int:
        """
        Initialize an external process running training or similar.

        :param command: A list the full path to a process to run and string arguments for the process.
        :return: the id of the process that was created.
        """
        return job.spawn_job(command[0], command[1:])

    def __init__(
        self, test_case_id: Optional[str] = None, group: Optional[str] = None
    ):
        """
        Initialize a new ProcessMeasurement measurement.

        :param test_case_id: A unique identifier for the measurement
        :param group: An optional group id, if we want to group this measurement with others.
        """
        self.group: Optional[str] = group
        """An optional group id, if we want to group this measurement with others."""

        super().__init__(test_case_id)

        self.thread: Optional[threading.Thread] = None
        """Thread that will be used to run the measurement process."""

        self.stored_value: Optional[Evidence] = None
        """The result of the measurement."""

        self.error: str = ""
        """Any error messages from running measurement."""

    # Overriden.
    def generate_metadata(self) -> MeasurementMetadata:
        """Returns Measurement metadata with additional info."""
        metadata = super().generate_metadata()

        # Add specific group being used, if any.
        if self.group:
            metadata.additional_data[self.PROCESS_GROUP_KEY] = self.group

        return metadata

    # Overriden.
    def additional_setup(self, model: MeasurementMetadata):
        """Customized method to set up Optional group id from metadata."""
        # Set up the group.
        if self.PROCESS_GROUP_KEY in model.additional_data:
            self.group = model.additional_data[self.PROCESS_GROUP_KEY]
        else:
            self.group = None

        # Update metadata.
        self.set_metadata()

    # Overriden.
    @abstractmethod
    def __call__(self, pid: int, *args, **kwargs) -> Evidence:
        """Calls for process based measurement will always have pid as the first argument.."""
        raise NotImplementedError(
            "Cannot evaluate abstract process measurement."
        )

    def evaluate(self, command: list[str], *args, **kwargs) -> Evidence:
        """Evaluate by starting the given process and waiting for it to complete."""
        if len(command) < 1:
            raise RuntimeError(
                f"Command list must as least have one item: {command}"
            )

        pid = ProcessMeasurement.start_process(command)
        self.evaluate_async(pid, *args, **kwargs)
        evidence = self.wait_for_output()
        return evidence

    def evaluate_async(self, pid: int, *args, **kwargs):
        """
        Monitor an external process at `pid` in a separate thread until it stops.
        Equivalent to evaluate(), but does not return the value immediately as it works in the background.

        :param pid: The process identifier
        """
        # Evaluate the measurement
        self.error = ""
        self.stored_value = None
        self.thread = threading.Thread(
            target=lambda: self._evaluate_thread(pid, *args, **kwargs)
        )
        self.thread.start()

    def _evaluate_thread(self, pid, *args, **kwargs):
        """
        Runs the evaluate method that should implement the measurement, and stores its results when it finishes.
        """
        try:
            self.stored_value = super().evaluate(pid, *args, **kwargs)
        except Exception as e:
            self.error = f"Could not evaluate process: {e} - trace: {traceback.format_exc()}"

    def wait_for_output(self, poll_interval: int = 1) -> Evidence:
        """
        Needed to get the output of a measurement executed in parallel using evaluate_async. Waits for the thread to finish.

        :param poll_interval: The poll interval in seconds
        :return: The resulting value of measurement execution, with semantics
        """
        # Wait for thread to finish, and return results once it is done.
        if self.thread is None:
            raise Exception(
                "Can't wait for value, no process is currently running."
            )
        while self.thread.is_alive():
            time.sleep(poll_interval)

        # If an exception was raised, return it here as an exception as well.
        if self.error != "":
            raise RuntimeError(self.error)

        if self.stored_value is None:
            raise Exception("No valid value was returned from measurement.")
        return self.stored_value

PROCESS_GROUP_KEY = 'group' class-attribute instance-attribute

Key to store optional process groups used by this measurement.

error = '' instance-attribute

Any error messages from running measurement.

group = group instance-attribute

An optional group id, if we want to group this measurement with others.

stored_value = None instance-attribute

The result of the measurement.

thread = None instance-attribute

Thread that will be used to run the measurement process.

__call__(pid, *args, **kwargs) abstractmethod

Calls for process based measurement will always have pid as the first argument..

Source code in mlte/measurement/process_measurement.py
 95
 96
 97
 98
 99
100
@abstractmethod
def __call__(self, pid: int, *args, **kwargs) -> Evidence:
    """Calls for process based measurement will always have pid as the first argument.."""
    raise NotImplementedError(
        "Cannot evaluate abstract process measurement."
    )

__init__(test_case_id=None, group=None)

Initialize a new ProcessMeasurement measurement.

Parameters:

Name Type Description Default
test_case_id Optional[str]

A unique identifier for the measurement

None
group Optional[str]

An optional group id, if we want to group this measurement with others.

None
Source code in mlte/measurement/process_measurement.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self, test_case_id: Optional[str] = None, group: Optional[str] = None
):
    """
    Initialize a new ProcessMeasurement measurement.

    :param test_case_id: A unique identifier for the measurement
    :param group: An optional group id, if we want to group this measurement with others.
    """
    self.group: Optional[str] = group
    """An optional group id, if we want to group this measurement with others."""

    super().__init__(test_case_id)

    self.thread: Optional[threading.Thread] = None
    """Thread that will be used to run the measurement process."""

    self.stored_value: Optional[Evidence] = None
    """The result of the measurement."""

    self.error: str = ""
    """Any error messages from running measurement."""

additional_setup(model)

Customized method to set up Optional group id from metadata.

Source code in mlte/measurement/process_measurement.py
83
84
85
86
87
88
89
90
91
92
def additional_setup(self, model: MeasurementMetadata):
    """Customized method to set up Optional group id from metadata."""
    # Set up the group.
    if self.PROCESS_GROUP_KEY in model.additional_data:
        self.group = model.additional_data[self.PROCESS_GROUP_KEY]
    else:
        self.group = None

    # Update metadata.
    self.set_metadata()

evaluate(command, *args, **kwargs)

Evaluate by starting the given process and waiting for it to complete.

Source code in mlte/measurement/process_measurement.py
102
103
104
105
106
107
108
109
110
111
112
def evaluate(self, command: list[str], *args, **kwargs) -> Evidence:
    """Evaluate by starting the given process and waiting for it to complete."""
    if len(command) < 1:
        raise RuntimeError(
            f"Command list must as least have one item: {command}"
        )

    pid = ProcessMeasurement.start_process(command)
    self.evaluate_async(pid, *args, **kwargs)
    evidence = self.wait_for_output()
    return evidence

evaluate_async(pid, *args, **kwargs)

Monitor an external process at pid in a separate thread until it stops. Equivalent to evaluate(), but does not return the value immediately as it works in the background.

Parameters:

Name Type Description Default
pid int

The process identifier

required
Source code in mlte/measurement/process_measurement.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def evaluate_async(self, pid: int, *args, **kwargs):
    """
    Monitor an external process at `pid` in a separate thread until it stops.
    Equivalent to evaluate(), but does not return the value immediately as it works in the background.

    :param pid: The process identifier
    """
    # Evaluate the measurement
    self.error = ""
    self.stored_value = None
    self.thread = threading.Thread(
        target=lambda: self._evaluate_thread(pid, *args, **kwargs)
    )
    self.thread.start()

generate_metadata()

Returns Measurement metadata with additional info.

Source code in mlte/measurement/process_measurement.py
72
73
74
75
76
77
78
79
80
def generate_metadata(self) -> MeasurementMetadata:
    """Returns Measurement metadata with additional info."""
    metadata = super().generate_metadata()

    # Add specific group being used, if any.
    if self.group:
        metadata.additional_data[self.PROCESS_GROUP_KEY] = self.group

    return metadata

start_process(command) staticmethod

Initialize an external process running training or similar.

Parameters:

Name Type Description Default
command list[str]

A list the full path to a process to run and string arguments for the process.

required

Returns:

Type Description
int

the id of the process that was created.

Source code in mlte/measurement/process_measurement.py
38
39
40
41
42
43
44
45
46
@staticmethod
def start_process(command: list[str]) -> int:
    """
    Initialize an external process running training or similar.

    :param command: A list the full path to a process to run and string arguments for the process.
    :return: the id of the process that was created.
    """
    return job.spawn_job(command[0], command[1:])

start_script(command) staticmethod

Initialize an external Python process running training or similar script.

Parameters:

Name Type Description Default
command list[str]

A list with the full path to a Python script with the training or equivalent process to run, and a list of string arguments for the process.

required

Returns:

Type Description
int

the id of the process that was created.

Source code in mlte/measurement/process_measurement.py
27
28
29
30
31
32
33
34
35
36
@staticmethod
def start_script(command: list[str]) -> int:
    """
    Initialize an external Python process running training or similar script.

    :param command: A list with the full path to a Python script with the training or
                    equivalent process to run, and a list of string arguments for the process.
    :return: the id of the process that was created.
    """
    return job.spawn_python_job(command[0], command[1:])

wait_for_output(poll_interval=1)

Needed to get the output of a measurement executed in parallel using evaluate_async. Waits for the thread to finish.

Parameters:

Name Type Description Default
poll_interval int

The poll interval in seconds

1

Returns:

Type Description
Evidence

The resulting value of measurement execution, with semantics

Source code in mlte/measurement/process_measurement.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def wait_for_output(self, poll_interval: int = 1) -> Evidence:
    """
    Needed to get the output of a measurement executed in parallel using evaluate_async. Waits for the thread to finish.

    :param poll_interval: The poll interval in seconds
    :return: The resulting value of measurement execution, with semantics
    """
    # Wait for thread to finish, and return results once it is done.
    if self.thread is None:
        raise Exception(
            "Can't wait for value, no process is currently running."
        )
    while self.thread.is_alive():
        time.sleep(poll_interval)

    # If an exception was raised, return it here as an exception as well.
    if self.error != "":
        raise RuntimeError(self.error)

    if self.stored_value is None:
        raise Exception("No valid value was returned from measurement.")
    return self.stored_value