Skip to content

process_measurement_group

Handling multiple process measurements at the same time more easily.

ProcessMeasurementGroup

Class that allows you to run multiple separte ProcessMeasurement classes on the same external process with an interface similar to regular Measurements.

Source code in mlte/measurement/process_measurement_group.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class ProcessMeasurementGroup:
    """
    Class that allows you to run multiple separte ProcessMeasurement classes
    on the same external process with an interface similar to regular Measurements.
    """

    def __init__(self):
        self.measurements: list[ProcessMeasurement] = []
        """List of process measurements to evaluate."""

    def add(self, measurement: ProcessMeasurement) -> None:
        """Add a measurement to the internal list."""
        self.measurements.append(measurement)

    def evaluate(
        self, command: list[str], inputs: dict[str, list[Any]] = {}
    ) -> dict[str, Evidence]:
        """Start an external process and run multiple process measurements on it."""
        # Start the external process to measure.
        pid = ProcessMeasurement.start_process(command)

        # Start up all measurement tools.
        for measurement in self.measurements:
            if not measurement.evidence_metadata:
                raise RuntimeError(
                    "Can't evaluate measurement before setting its id"
                )
            curr_inputs = (
                inputs[measurement.test_case_id]
                if measurement.test_case_id in inputs
                else []
            )

            # We ignore the first input, as it is the command we have already extracted.
            measurement.evaluate_async(pid, *curr_inputs[1:])

        # Wait for results. This could be threaded, but not clear if the benefits make it worth it.
        evidences: dict[str, Evidence] = {}
        for measurement in self.measurements:
            if measurement.evidence_metadata:
                evidence = measurement.wait_for_output().with_metadata(
                    measurement.evidence_metadata
                )
                evidences[measurement.evidence_metadata.test_case_id] = evidence

        return evidences

    @staticmethod
    def evaluate_groups(
        groups: dict[str, list[ProcessMeasurement]],
        inputs: dict[str, list[Any]],
    ) -> dict[str, Evidence]:
        """
        Given groups of measurements, run each group in a ProcessMeasurementGroup.

        :param groups: A dict of groups keyed by group id, each group having a list of ProcessMeasurements to be run together.
        :param inputs: A dict of inputs, per test case id.
        :returns: A dict of evidences, by test case id.
        """
        all_evidences: dict[str, Evidence] = {}
        for _, group in groups.items():
            # Create a ProcessMeasurementGroup for group.
            measurement_group = ProcessMeasurementGroup()
            for measurement in group:
                # Only consider measurements that have a case id properly set up.
                if measurement.test_case_id:
                    measurement_group.add(measurement)

                    # We assume all test cases in the group have the same command.
                    # TODO: see if we should improve this, or check for this.
                    if (
                        measurement.test_case_id in inputs
                        and len(inputs[measurement.test_case_id]) > 0
                    ):
                        command = inputs[measurement.test_case_id][0]
                    else:
                        raise RuntimeError(
                            "One ore more measurements did not have an input command."
                        )

            # Execute command and run all measurements.
            evidences = measurement_group.evaluate(command, inputs)
            all_evidences.update(evidences)

        return all_evidences

measurements = [] instance-attribute

List of process measurements to evaluate.

add(measurement)

Add a measurement to the internal list.

Source code in mlte/measurement/process_measurement_group.py
19
20
21
def add(self, measurement: ProcessMeasurement) -> None:
    """Add a measurement to the internal list."""
    self.measurements.append(measurement)

evaluate(command, inputs={})

Start an external process and run multiple process measurements on it.

Source code in mlte/measurement/process_measurement_group.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def evaluate(
    self, command: list[str], inputs: dict[str, list[Any]] = {}
) -> dict[str, Evidence]:
    """Start an external process and run multiple process measurements on it."""
    # Start the external process to measure.
    pid = ProcessMeasurement.start_process(command)

    # Start up all measurement tools.
    for measurement in self.measurements:
        if not measurement.evidence_metadata:
            raise RuntimeError(
                "Can't evaluate measurement before setting its id"
            )
        curr_inputs = (
            inputs[measurement.test_case_id]
            if measurement.test_case_id in inputs
            else []
        )

        # We ignore the first input, as it is the command we have already extracted.
        measurement.evaluate_async(pid, *curr_inputs[1:])

    # Wait for results. This could be threaded, but not clear if the benefits make it worth it.
    evidences: dict[str, Evidence] = {}
    for measurement in self.measurements:
        if measurement.evidence_metadata:
            evidence = measurement.wait_for_output().with_metadata(
                measurement.evidence_metadata
            )
            evidences[measurement.evidence_metadata.test_case_id] = evidence

    return evidences

evaluate_groups(groups, inputs) staticmethod

Given groups of measurements, run each group in a ProcessMeasurementGroup.

Parameters:

Name Type Description Default
groups dict[str, list[ProcessMeasurement]]

A dict of groups keyed by group id, each group having a list of ProcessMeasurements to be run together.

required
inputs dict[str, list[Any]]

A dict of inputs, per test case id.

required

Returns:

Type Description
dict[str, Evidence]

A dict of evidences, by test case id.

Source code in mlte/measurement/process_measurement_group.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@staticmethod
def evaluate_groups(
    groups: dict[str, list[ProcessMeasurement]],
    inputs: dict[str, list[Any]],
) -> dict[str, Evidence]:
    """
    Given groups of measurements, run each group in a ProcessMeasurementGroup.

    :param groups: A dict of groups keyed by group id, each group having a list of ProcessMeasurements to be run together.
    :param inputs: A dict of inputs, per test case id.
    :returns: A dict of evidences, by test case id.
    """
    all_evidences: dict[str, Evidence] = {}
    for _, group in groups.items():
        # Create a ProcessMeasurementGroup for group.
        measurement_group = ProcessMeasurementGroup()
        for measurement in group:
            # Only consider measurements that have a case id properly set up.
            if measurement.test_case_id:
                measurement_group.add(measurement)

                # We assume all test cases in the group have the same command.
                # TODO: see if we should improve this, or check for this.
                if (
                    measurement.test_case_id in inputs
                    and len(inputs[measurement.test_case_id]) > 0
                ):
                    command = inputs[measurement.test_case_id][0]
                else:
                    raise RuntimeError(
                        "One ore more measurements did not have an input command."
                    )

        # Execute command and run all measurements.
        evidences = measurement_group.evaluate(command, inputs)
        all_evidences.update(evidences)

    return all_evidences