Source code for noisify.reporters.reporter

"""
.. Dstl (c) Crown Copyright 2019
"""
from noisify.helpers import Fallible
from noisify.attribute_readers import dictionary_lookup
from pprint import pformat
from copy import deepcopy

from noisify.reporters.report import Report


[docs]class Reporter(Fallible): """The most important class in Noisify! Reporters define how objects should be changed. They can be as specific or a general as needed. """ def __init__(self, attributes=None, attribute_type=dictionary_lookup, faults=None): self.attributes = attributes or [] self.attribute_introspection_strategy = attribute_type Fallible.__init__(self, faults=faults) self.report_index = 0
[docs] def create_report(self, truth_object, identifier=None): """ Calling the reporter object directly on an object will call this method. :param truth_object: Anything :param identifier: Optional identifier for the output report, defaults to a serial integer :return: A report for the given input object """ identifier = identifier or self.report_index self.report_index += 1 triggered_faults, measures = self._measure(truth_object) return Report(identifier, self._get_truth(truth_object), triggered_faults, measures)
def _measure(self, truth_object): """ :param truth_object: object to apply flaws to :return: a dictionary of faults triggered for different attribute_readers, and the final noised object """ measurement, triggered_faults = self._get_attribute_measurements(truth_object) applied_faults, flawed_measurement = self.apply_all_faults(measurement) triggered_faults['reporter'] = applied_faults return triggered_faults, flawed_measurement def _get_attribute_measurements(self, truth_object): output_object = deepcopy(truth_object) triggered_faults = {} attributes = self._get_or_introspect_attributes(truth_object) if not attributes: return truth_object, {} for attribute in attributes: faults, new_value = attribute.measure(truth_object) attribute.update_value(output_object, new_value) triggered_faults[attribute.attribute_identifier] = faults return output_object, triggered_faults def _get_or_introspect_attributes(self, truth_object): return self.attributes or list(self.attribute_introspection_strategy(truth_object)) def _get_truth(self, truth_object): attributes = self._get_or_introspect_attributes(truth_object) if not attributes: return truth_object truth = {} for attribute in attributes: truth[attribute.attribute_identifier] = attribute.get_value(truth_object) return truth
[docs] def get_attribute_by_id(self, attribute_identifier): """Getter method for report attribute_readers""" for attribute in self.attributes: if attribute.attribute_identifier == attribute_identifier: return attribute
def __call__(self, *args, **kwargs): return self.create_report(*args, **kwargs) def __add__(self, other): output = Fallible.__add__(self, other) new_attributes = self.merge_attributes(other, output) output.attributes = new_attributes return output
[docs] @staticmethod def merge_attributes(report1, report2): """Merges attribute_readers between two reporters, used for reporter addition""" report1_attributes = set(a.attribute_identifier for a in report1.attributes) report2_attributes = set(a.attribute_identifier for a in report2.attributes) new_attributes = [] for attribute_id in report1_attributes & report2_attributes: local = report1.get_attribute_by_id(attribute_id) + report2.get_attribute_by_id(attribute_id) new_attributes.append(local) for attribute_id in report1_attributes - report2_attributes: new_attributes.append(report1.get_attribute_by_id(attribute_id)) for attribute_id in report2_attributes - report1_attributes: new_attributes.append(report2.get_attribute_by_id(attribute_id)) return new_attributes
def __repr__(self): return pformat({'Reporter': {'Attributes': [a for a in self.attributes], 'Faults': [f for f in self.faults]} })