Synchronizer Reference
This section is a reference for the commonly used APIs exposed by the synchronizer framework.
Convenience Methods
As part of the model definition, it is possible to extend the autogenerated gRPC APIs with custom methods, for example, to facilitate access to some kind of data from the synchronizers.
convenience_methods
can be defined in a folder (convenience
) that needs to
be a child of the models_dir
as per your configuration.
For example if your configuration contains:
models_dir: "/opt/xos/synchronizers/<synchronizer_name>/models"
then you convenience methods
needs to be located in
/opt/xos/synchronizers/<synchronizer_name>/models/convenience
Assuming our model definition looks like:
message MyModel (XOSBase){
required string first_name = 1 [null = False, blank = False];
required string last_name = 2 [null = False, blank = False];
}
here is an example of a basic convenience methods that will expose a
full_name
property over the APIs used by the synchronizers:
from xosapi.orm import ORMWrapper, register_convenience_wrapper
from xosconfig import Config
from multistructlog import create_logger
log = create_logger(Config().get('logging'))
class ORMWrapperMyModel(ORMWrapper):
@property
def full_name(self):
return "%s %s" % (self.first_name, self.last_name)
register_convenience_wrapper("MyModel", ORMWrapperMyModel)
Note: The convenience methods will be loaded in all the synchronizer containers so that they can be used in multiple places.
Model Policies
Model Policies can be seen as post-save
hooks and they are generally defined
in the xos/synchronizer/model_policies
folder of your service.
Model policies are generally used to dynamically create a service chain (when a ServiceInstance is created it will create a ServiceInstance of its east side Service).
Note: You'll need to add this folder in your synchronizer configuration file as:
model_policies_dir: "/opt/xos/synchronizers/<synchronizer_name>/model_policies"
A model policy is a class that inherits from Policy
:
from synchronizers.new_base.modelaccessor import MyServiceInstance, ServiceInstanceLink, model_accessor
from synchronizers.new_base.policy import Policy
class MyServiceInstancePolicy(Policy):
model_name = "MyServiceInstance"
and overrides one or more of the following methods:
def handle_create(self, model):
def handle_update(self, model):
def handle_delete(self, model):
Where model
is the instance of the model that has been created.
Sync Steps
Sync Steps are the actual piece of code that provide the mapping between your models and your backend. You will need to define a sync step for each model.
Note: You'll need to add this folder in your synchronizer configuration file as:
steps_dir: "/opt/xos/synchronizers/<synchronizer_name>/steps"
A Sync Step is a class that inherits from SyncStep
:
from synchronizers.new_base.SyncInstanceUsingAnsible import SyncStep
from synchronizers.new_base.modelaccessor import MyModel
from xosconfig import Config
from multistructlog import create_logger
log = create_logger(Config().get('logging'))
class SyncMyModel(SyncStep):
provides = [MyModel]
observes = MyModel
and provides these methods:
def sync_record(self, o):
log.info("sync'ing object", object=str(o), **o.tologdict())
def delete_record(self, o):
log.info("deleting object", object=str(o), **o.tologdict())
This methods will be invoked anytime there is change in the model passing as argument the changed models. After performing the required operations to sync the model state with the backend state the synchronizer framework will update the models with the operational informations needed.
Pull Steps
Pull Steps can be used to observe the surrounding environment and update the data-model accordingly.
Note: You'll need to add this folder in your synchronizer configuration file as:
pull_steps_dir: "/opt/xos/synchronizers/<synchronizer_name>/pull_steps"
A Sync Step is a class that inherits from PullStep
from synchronizers.new_base.pullstep import PullStep
from synchronizers.new_base.modelaccessor import OLTDevice
from xosconfig import Config
from multistructlog import create_logger
log = create_logger(Config().get('logging'))
from synchronizers.new_base.modelaccessor import MyModel
class MyModelPullStep(PullStep):
def __init__(self):
super(MyModelPullStep, self).__init__(observed_model=OLTDevice)
and override the following method:
def pull_records(self):
log.info("pulling MyModels")
# create an empty model
o = MyModel()
# code to fetch information
# populate the model
o.first_name = "John"
o.last_name = "Doe"
o.no_sync = True # this is required to prevent the synchronizer to be invoked and start a loop
o.save()
Event Steps
Event Steps are similar to pull steps in that they are often used to implement a flow of information from the environment into the data model. However, rather than using polling, event steps rely on externally generated events delivered via an event bus, such as Kafka.
Note: You'll need to add this folder in your synchronizer configuration file as:
event_steps_dir: "/opt/xos/synchronizers/<synchronizer_name>/event_steps"
You'll also need to make sure the event bus endpoint is specified in the synchronizer config file. For example:
event_bus: endpoint: cord-kafka kind: kafka
An event step inherits from the EventStep
class:
import json
from synchronizers.new_base.eventstep import EventStep
from synchronizers.new_base.modelaccessor import MyModel
from xosconfig import Config
from multistructlog import create_logger
log = create_logger(Config().get('logging'))
class MyModelEventStep(EventStep):
technology = "kafka"
topics = ["MyTopic"]
def __init__(self, *args, **kwargs):
super(MyEventStep, self).__init__(*args, **kwargs)
Two important class members that are defined in each event step are technology
and topics
. technology
tells what type of event bus to use. There's currently only one bus interface implemented by the synchronizer framework, and that is kafka
. The topics
member is a list of topics that will be listened on for events. The precise meaning of topics
is left to the particular event bus technology that is in use.
Service-specific logic is implemented by overriding the process_event()
method:
def process_event(self, event):
value = json.loads(event.value)
first_name = value["first_name"]
last_name = value["last_name"]
# See if the object already exists
objs = MyModel.filter(first_name=first_name, last_name=last_name)
if objs:
return
# Create a new object
obj = MyModel()
obj.first_name = first_name
obj.last_name = last_name
obj.save(always_update_timestamp = True)
In this example we've made the assumption that the value of an event is a json-encoded dictionary containing the keys first_name
and last_name
. The event step in this case checks to see if an object with those fields already exists, and if not then it creates the object.
In this example, we've differed from the Pull Step example in that we omitted no_sync=True
and we added always_update_timestamp = True
to the save()
call. This has the effect of causing the synchronizer to excute any sync steps that might exist for MyModel
. Whether or not you want sync_steps to run is an implementation decision and depends upon the design of your synchronizer.
Sending an event to Kafka can be done using a variety of Kafka clients for various languages, Kafka command-line tools, etc. A python example is as follows:
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="cord-kafka")
producer.send("MyTopic", json.dumps({"first_name": "John", "last_name": "Doe"}))
producer.flush()