End-End Example#
This section provides a complete, end-to-end example of an existing interpreter. It walks through each step, from initialising the interpreter to handling metadata, processing measurements, and managing error handling. This section directly links to the “End-to-End Example” from the “composing adapters” document, showing how the adapter and interpreter from each section combine to create a functional adapter. These end-end examples are a reduced version of the “Biolector” adapter within the code base. The Biolector is a discrete bioreactor where experiments have defined beginnings and endings. Practically, the Biolector that this adapter is developed for writing measurements in real-time to a file. When the experiment begins, the file is created and populated with metadata. Also within this metadata is a mapping between the measurements and integers which appear in the measurements to denote what measurement has been taken. Like many pieces of equipment, the Biolector structures its data in unusual ways that are specific to itself; this, for most equipment, will constitute the most significant part of the interpreter. Because of this, it must be noted that interpreters are highly specific to the data, so it is not required that you understand entirely the function of this example; it is here simply to show how all of the elements may come together.
1. Defining the constructor#
The constructor for this example is relatively simple. It inherits from AbstractInterpreter, which is mandatory and passes the error_holder argument to the base class. This specific interpreter also initialised some member variables, which will be set when the metadata is extracted and used when a measurement is taken.
class Biolector1Interpreter(AbstractInterpreter):
def __init__(self,error_holder=None):
super().__init__(error_holder=error_holder)
self._filtermap = None
self._parameters = None
self._sensors = None
2. Metadata function#
Because the Biolector runs discrete experiments, it has a start and stop phase. Also, when an experiment starts, the machine inserts its metadata into the measurements file (the watch file), which needs to be set to identify the type of measurement taken at the time. Therefore, the metadata function is overwritten. The specifics of the steps are written as comments in the function, but in short, the metadata function builds a metadata payload from details such as protocol, device, user, and filter sets (much of the operations are an attempt to get information from the semi-structured free text CSV that the Biolector software creates). It also sets some member variables for when measurements are taken (These are used in the measurements function to identify measurement type).
def metadata(self, data) -> dict[str, any]:
filtersets = {}
parameters = {}
metadata = {}
in_filtersets = False
if not isinstance(data,list):
self._handle_exception(InterpreterError(f'Cant extract metadata, input malformed'))
# Perform different actions based on when a
# specific type of data is encountered.
for row in data:
if not row or not row[0]:
continue
if row[0] == 'PROTOCOL':
metadata['PROTOCOL'] = row[1]
elif row[0] == 'DEVICE':
metadata['DEVICE'] = row[1]
elif row[0] == 'USER':
metadata['USER'] = row[1]
elif row[0] == 'FILTERSET':
in_filtersets = True
continue
elif row[0] == 'READING':
in_filtersets = False
continue
# If the current row contains filterset data.
if in_filtersets and row[FILTERSET_ID_IDX].strip().isdigit():
filterset_id = int(row[FILTERSET_ID_IDX].strip())
# Extract one filter.
filtersets[filterset_id] = {
'FILTERNAME': row[FILTERNAME_IDX],
'EX [nm]': row[EX_IDX],
'EM [nm]': row[EM_IDX],
'LAYOUT': row[LAYOUT_IDX],
'FILTERNR': row[FILTERNR_IDX],
'GAIN': row[GAIN_IDX],
}
# The target parameters are nested here too.
if row[PROCESS_PARAM_IDX].startswith('SET '):
param_name = row[PROCESS_PARAM_IDX]
param_value = row[PROCESS_VALUE_IDX].strip()
parameters[param_name] = param_value
# Create an experiment id based on metadata.
self.id = f'{metadata["PROTOCOL"]}-{metadata["DEVICE"]}-{metadata["USER"]}-{str(uuid.uuid4())}'
# Set filtermaps and sensor data for measurements.
self._filtermap = {k: v["FILTERNAME"] for k, v in filtersets.items()}
self._sensors = {v.pop('FILTERNAME'): v for v in copy.deepcopy(filtersets).values()}
# Create payload for exporting.
payload = {
self.TIMESTAMP_KEY: datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
self.EXPERIMENT_ID_KEY: self.id
}
if parameters is not None:
payload[self._TARGET_PARAMS_KEY] = parameters
if self._sensors is not None:
payload[self._SENSORS_KEY] = self._sensors
return payload
3. Measurement function#
Once the metadata is captured, the equipment will periodically take measurements. When the adapter system has identified an experiment that has been taken, the full content of the CSV file will be provided to the interpreter. Therefore, in this case, the CSV data is iterated from the bottom, where the most recent measurements are contained, and stops when a specific flag is encountered. The Biolector monitors use optical sensors to take all measurements, so several steps must be applied to identify the correct measurement. Within a specific measurement, the filter set variable (set during the metadata function) is used to identify the experiment’s name, which in turn is used to identify the sensor data. This sensor data is then used to identify a measurement module to get a standard term for the measurement and transform it into a standardised value. Finally, this is packed into the payload. As noted before, this process is specific to the Biolector equipment. Generally, a measurement function should convert measurement data to uniform terms and values using the measurement objects and then package this data into a structure aligned with the influx data structure.
def measurement(self, data):
# Create payload inline with the
# influx object structure.
measurements = {}
update = {
"measurement": "Biolector1",
"tags": {"project": "indpensim"},
"fields": measurements,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
# Unable to take measurements without mapping
if self._filtermap is None:
self._handle_exception(InterpreterError("No filters defined",
severity=SeverityLevel.WARNING))
# Iterate from the bottom of the data
for row in data[::-1]:
# Identify the end of the newest measurement
if row[0] != "R":
return update
fs_code = int(row[4])
# Maps code to the measurement type (filter)
name = self._get_filtername(fs_code)
well_num = row[1]
amplitude = row[5]
if name is not None:
# Gets the data for a specific measurement type (Sensor)
sensor_data = self._get_sensor_data(name)
excitation = int(sensor_data["EX [nm]"])
emitence = int(sensor_data["EM [nm]"])
# Find the specific measurement object.
measurement = self._get_measurement_type(excitation, emitence)
measurement_term = measurement.term
# Transforms using the measurement object.
value = measurement.transform(amplitude)
else:
measurement_term = "unknown_measurement"
value = amplitude
if measurement_term not in measurements:
measurements[measurement_term] = []
# Pack measurement into payload.
measurement_data = {
"value": value,
"name": name,
"well_num": well_num}
measurements[measurement_term].append(measurement_data)
return update
4. Simulate function (Optional)#
Because the Biolector equipment writes measurements to a file, this system can be mocked by inserting existing data into it (provided this data is in the same structure as how the equipment’s software inserts it). Therefore, the simulate() function here simply reads existing measurements (read_file) and writes them to read_file, which the larger adapter system will identify as new data and initiate the process as if a real measurement had been taken. The Simulate() function will extract and write the metadata, then write each measurement, waiting in between to mock real measurement taking. Once all measurements have been written, the function exits. Note that this function is adapter-specific. Also, simulations are optional; if the simulation is to be implemented, it could be implemented in the adapter or interpreter.
def simulate(self, read_file, write_file, wait):
def write(chunk):
# Write data to file.
with open(write_file, mode='a', newline='', encoding='latin-1') as file:
writer = csv.writer(file, delimiter=';')
writer.writerows(chunk)
# Read existing data.
with open(read_file, 'r', encoding='latin-1') as file:
reader = csv.reader(file, delimiter=';')
rows = list(reader)
for index, row in enumerate(rows):
if len(row) == 0:
continue
# Extract the metadata.
if row[0] == "READING":
metadata = rows[:index + 1]
data = rows[index + 1:]
break
write(metadata)
time.sleep(wait)
chunk = [data.pop(0)]
cur_read = data[0][0]
for row in data:
# Identify the end of the measurement.
if row[0] != cur_read and row[0] != "R":
write(chunk)
chunk = []
cur_read = row[0]
time.sleep(wait)
else:
chunk.append(row)
write(chunk)