Implementing a function#
Functions are split into two parts: A function definition class, which acts as a factory method for the actual function class implementing the function interface. The function definition specifies the input and output dataschemes and is used for the planning. Executors use the function definition to instantiate function objects which do the actual computations.
Implementing the function#
When implementing your own functions, you should first start with the function implementation.
For this example, we will implement a function which takes two series and either returns the first or the second one based on a static parameter.
Boilerplate#
Declare a new class and derive it from FunctionBase, which is the base class implementing the IFunction interface.
from typing import Optional, Iterable
from openmnglab.functions.base import FunctionBase
from openmnglab.model.datamodel.interface import IDataContainer
class SeriesChooserFunc(FunctionBase):
def execute(self) -> Iterable[IDataContainer]:
pass
def set_input(self, *data_in: IDataContainer):
pass
You function implementation must provide the methods set_input and execute. set_input sets the input data of the function and with a call to
execute, the function should execute on the previously set data.
Implementation#
Note
The contracts defined by the model between an executor and a function give you some guarantees when implementing the function:
executemay only be called after a successfull call toset_inputAll data passed to
set_inputmust adhere to the dataformat specified by the function definition
If your function fails because of one of those guarantees were ignored by the executor implementation, that is not your problem.
While the execution framework can -in theory- handle all types data, the main way to pass data between functions is by using pandas data structures, wrapped in a
PandasContainer. You must always name all elements (indizes, columns, series) and provide a quantity for each element. Please see Data exchange for a reasoning behind this
The function should take two inputs and return a single one PandasContainer. We will implement the constructor and specify the type annotations of our class:
from typing import Optional, Iterable
import pandas as pd
from openmnglab.datamodel.pandas.model import PandasContainer
from openmnglab.functions.base import FunctionBase
from openmnglab.model.datamodel.interface import IDataContainer
class SeriesChooserFunc(FunctionBase):
def __init__(self, chosen_series: int):
# annotate with PandasContainer type. No optional required, as it should always be initialized when a read-access occours.
self.series_a_container: PandasContainer[pd.Series] = None
self.series_b_container: PandasContainer[pd.Series] = None
self.chosen_series = chosen_series
def execute(self) -> tuple[PandasContainer[pd.Series]]:
pass
def set_input(self, series_a_container: PandasContainer[pd.Series], series_b_container: PandasContainer[pd.Series]):
self.series_a_container = series_a_container
self.series_b_container = series_b_container
Now implement the (trivial) execution function. For the sake of demonstration, we will not simple return the chosen container, but will construct a new one with the chosen series. Remember that a PandasContainer requires a dictionary which contains the unit of each column and index of the contained pandas structure.
Important
Always make sure to include a comma at the end of execute to return a tuple! Returning a single item not wrapped in a tuple is not supported at this time.
import pandas as pd
from openmnglab.datamodel.pandas.model import PandasContainer
from openmnglab.functions.base import FunctionBase
class SeriesChooserFunc(FunctionBase):
def __init__(self, chosen_series: int):
# annotate with PandasContainer type. No optional required, as it should always be initialized when a read-access occours.
self.series_a_container: PandasContainer[pd.Series] = None
self.series_b_container: PandasContainer[pd.Series] = None
self.chosen_series = chosen_series
def execute(self) -> tuple[PandasContainer[pd.Series]]:
chosen_container = self.series_a_container if self.chosen_series == 0 else self.series_b_container
return_container = PandasContainer(chosen_container.data, chosen_container.units)
# remember to return a tuple
return return_container,
def set_input(self, series_a_container: PandasContainer[pd.Series], series_b_container: PandasContainer[pd.Series]):
self.series_a_container = series_a_container
self.series_b_container = series_b_container
Test#
You should now test your function to make sure it works like intended:
import quantities as pq
# construct the series and name them (and their index)
ser_a = pd.Series([1,2,3], name="Series A")
ser_b = pd.Series([4,5,6], name="Series B")
for s in (ser_a,ser_b):
s.index.name = "number idx"
# put both series in a container, make the series themself and its index dimensionless
dimless_container = lambda s: PandasContainer(s, units={s.name:pq.dimensionless, s.index.name:pq.dimensionless})
ser_a_container = dimless_container(ser_a)
ser_b_container = dimless_container(ser_b)
# construct an instance of the function, select the second series
chooser = SeriesChooserFunc(1)
# set the input
chooser.set_input(ser_a_container, ser_b_container)
# execute it; remember to include the comma to unpack the returned tuple
chosen_series_container, = chooser.execute()
print(chosen_series_container)
Implementing the function definition#
Right now, our function is working, but cannot be used in the execution framework. To use it in an execution flow, we need to implement a function definition.
Function definitions are defined in IFunctionDefinition. Function definitions provide data schemas to verify that only compatible
in- and outputs are connected during planning. They also must provide a hash value which uniquely identifies the function and its configuration. That hash must be the same for all
instances of the function which are running the same configuration.
Boilerplate#
We will derive our implementation from the base class FunctionDefinitionBase,
which implements some default behaviours to reduce the required amount of boilerplate.
class SeriesChooser(FunctionDefinitionBase):
def __init__(self):
super().__init__('')
@property
def config_hash(self) -> bytes:
return super().config_hash
@property
def consumes(self) -> Optional[Sequence[IInputDataScheme] | IInputDataScheme]:
pass
def production_for(self, *inputs: IInputDataScheme) -> Optional[Sequence[IOutputDataScheme] | IOutputDataScheme]:
pass
def new_function(self) -> IFunction:
pass
Implementation#
Function identification#
First thing to do is to choose a unique identifier for our function and initialize the super constructor with it, i.e. openmgnlab.qs.serieschooser.
Note
For your own functions include something like your git username in the identifier to decrease the chance of a collision.
def __init__(self):
super().__init__('openmgnlab.qs.serieschooser')
Config hash#
We will have to specify the config_hash. This property is used to calculate a hash unique for the configuration of the function, so calculated data can be uniquely identified later.
Add our sole config argument to the constructor and return the hash of it. You should use openmnglab.util.hashing.Hash for an easy, fluent and typed wrapper around Pythons hashing functions.
Note
FunctionDefinitionBase implements identifying_hash
by combining the hash of the function id with the result of config_hash.
Note
Data ids is calculated by combining the result of identifying_hash with the number of the output and
the ids of the input data (if any). This allows unique identification of data throughout the execution.
@property
def config_hash(self) -> bytes:
return Hash().int(self.chosen_series).digest()
Data schemas#
In the next step, we will define the dataschemas our function will consume and produce. As we do not have specific requirements, we can just return a new PandasInputDataScheme with an empty SeriesSchema, to just accept any series whatsoever.
@property
def consumes(self) -> tuple[PandasInputDataScheme[SeriesSchema],PandasInputDataScheme[SeriesSchema]]:
return PandasInputDataScheme(SeriesSchema()),PandasInputDataScheme(SeriesSchema())
we will also have to implement a function that will return the data schema for a concrete set of inputs. In our case, we can predict the concrete schema of the returned series based on our chosen_series parameter:
def production_for(self, schema_a: PandasOutputDataScheme[SeriesSchema], schema_b: PandasOutputDataScheme[SeriesSchema]) -> tuple[PandasOutputDataScheme[SeriesSchema]]:
return schema_a if self.chosen_series == 0 else schema_b,
Finally, we have to implement the factory function
def new_function(self) -> SeriesChooserFunc:
return SeriesChooserFunc(self.chosen_series)
Complete implementation#
import pandas as pd
from pandera import SeriesSchema
from openmnglab.datamodel.pandas.model import PandasContainer, PandasOutputDataScheme, \
PandasInputDataScheme
from openmnglab.functions.base import FunctionBase, FunctionDefinitionBase
from openmnglab.util.hashing import Hash
class SeriesChooserFunc(FunctionBase):
def __init__(self, chosen_series: int):
# annotate with PandasContainer type. No optional required, as it should always be initialized when a read-access occours.
self.series_a_container: PandasContainer[pd.Series] = None
self.series_b_container: PandasContainer[pd.Series] = None
self.chosen_series = chosen_series
def execute(self) -> tuple[PandasContainer[pd.Series]]:
chosen_container = self.series_a_container if self.chosen_series == 0 else self.series_b_container
return_container = PandasContainer(chosen_container.data, chosen_container.units)
# remember to return a tuple
return return_container,
def set_input(self, series_a_container: PandasContainer[pd.Series], series_b_container: PandasContainer[pd.Series]):
self.series_a_container = series_a_container
self.series_b_container = series_b_container
class SeriesChooser(FunctionDefinitionBase):
def __init__(self, chosen_series: int):
super().__init__('openmgnlab.qs.serieschooser')
self.chosen_series = chosen_series
@property
def config_hash(self) -> bytes:
return Hash().int(self.chosen_series).digest()
@property
def consumes(self) -> tuple[PandasInputDataScheme[SeriesSchema], PandasInputDataScheme[SeriesSchema]]:
return PandasInputDataScheme(SeriesSchema()), PandasInputDataScheme(SeriesSchema())
def production_for(self, schema_a: PandasOutputDataScheme[SeriesSchema],
schema_b: PandasOutputDataScheme[SeriesSchema]) -> tuple[PandasOutputDataScheme[SeriesSchema]]:
return schema_a if self.chosen_series == 0 else schema_b,
def new_function(self) -> SeriesChooserFunc:
return SeriesChooserFunc(self.chosen_series)