Synchronizer Reference

This section is a reference for the commonly used APIs exposed by the synchronizer framework.

Convenience Methods

As part of the model definition, it is possible to extend the autogenerated gRPC APIs with custom methods, for example, to facilitate access to some kind of data from the synchronizers.

convenience_methods can be defined in a folder (convenience) that needs to be a child of the models_dir as per your configuration.

For example if your configuration contains:

models_dir: "/opt/xos/synchronizers/<synchronizer_name>/models"

then you convenience methods needs to be located in /opt/xos/synchronizers/<synchronizer_name>/models/convenience

Assuming our model definition looks like:

message MyModel (XOSBase){
    required string first_name = 1 [null = False, blank = False];
    required string last_name = 2 [null = False, blank = False];
}

here is an example of a basic convenience methods that will expose a full_name property over the APIs used by the synchronizers:

from xosapi.orm import ORMWrapper, register_convenience_wrapper

from xosconfig import Config
from multistructlog import create_logger

log = create_logger(Config().get('logging'))

class ORMWrapperMyModel(ORMWrapper):

    @property
    def full_name(self):
        return "%s %s" % (self.first_name, self.last_name)

register_convenience_wrapper("MyModel", ORMWrapperMyModel)

Note: The convenience methods will be loaded in all the synchronizer containers so that they can be used in multiple places.

Model Policies

Model Policies can be seen as post-save hooks and they are generally defined in the xos/synchronizer/model_policies folder of your service.

Model policies are generally used to dynamically create a service chain (when a ServiceInstance is created it will create a ServiceInstance of its east side Service).

Note: You'll need to add this folder in your synchronizer configuration file as:

model_policies_dir: "/opt/xos/synchronizers/<synchronizer_name>/model_policies"

A model policy is a class that inherits from Policy:

from synchronizers.new_base.modelaccessor import MyServiceInstance, ServiceInstanceLink, model_accessor
from synchronizers.new_base.policy import Policy

class MyServiceInstancePolicy(Policy):
    model_name = "MyServiceInstance"

and overrides one or more of the following methods:

def handle_create(self, model):
def handle_update(self, model):
def handle_delete(self, model):

Where model is the instance of the model that has been created.

Sync Steps

Sync Steps are the actual piece of code that provide the mapping between your models and your backend. You will need to define a sync step for each model.

Note: You'll need to add this folder in your synchronizer configuration file as:

steps_dir: "/opt/xos/synchronizers/<synchronizer_name>/steps"

A Sync Step is a class that inherits from SyncStep:


from synchronizers.new_base.SyncInstanceUsingAnsible import SyncStep
from synchronizers.new_base.modelaccessor import MyModel

from xosconfig import Config
from multistructlog import create_logger

log = create_logger(Config().get('logging'))

class SyncMyModel(SyncStep):
    provides = [MyModel]

    observes = MyModel

and provides these methods:

def sync_record(self, o):
    log.info("sync'ing object", object=str(o), **o.tologdict())
def delete_record(self, o):
    log.info("deleting object", object=str(o), **o.tologdict())

This methods will be invoked anytime there is change in the model passing as argument the changed models. After performing the required operations to sync the model state with the backend state the synchronizer framework will update the models with the operational informations needed.

Pull Steps

Pull Steps can be used to observe the surrounding environment and update the data-model accordingly.

Note: You'll need to add this folder in your synchronizer configuration file as:

pull_steps_dir: "/opt/xos/synchronizers/<synchronizer_name>/pull_steps"

A Sync Step is a class that inherits from PullStep

from synchronizers.new_base.pullstep import PullStep
from synchronizers.new_base.modelaccessor import OLTDevice

from xosconfig import Config
from multistructlog import create_logger

log = create_logger(Config().get('logging'))

from synchronizers.new_base.modelaccessor import MyModel

class MyModelPullStep(PullStep):
    def __init__(self):
        super(MyModelPullStep, self).__init__(observed_model=OLTDevice)

and override the following method:

def pull_records(self):
    log.info("pulling MyModels")
    # create an empty model
    o = MyModel()
    # code to fetch information
    # populate the model
    o.first_name = "John"
    o.last_name = "Doe"
    o.no_sync = True # this is required to prevent the synchronizer to be invoked and start a loop
    o.save()

Event Steps

Event Steps are similar to pull steps in that they are often used to implement a flow of information from the environment into the data model. However, rather than using polling, event steps rely on externally generated events delivered via an event bus, such as Kafka.

Note: You'll need to add this folder in your synchronizer configuration file as:

event_steps_dir: "/opt/xos/synchronizers/<synchronizer_name>/event_steps"

You'll also need to make sure the event bus endpoint is specified in the synchronizer config file. For example:

event_bus:
  endpoint: cord-kafka
  kind: kafka

An event step inherits from the EventStep class:


import json
from synchronizers.new_base.eventstep import EventStep
from synchronizers.new_base.modelaccessor import MyModel
from xosconfig import Config
from multistructlog import create_logger

log = create_logger(Config().get('logging'))

class MyModelEventStep(EventStep):
    technology = "kafka"
    topics = ["MyTopic"]

    def __init__(self, *args, **kwargs):
        super(MyEventStep, self).__init__(*args, **kwargs)

Two important class members that are defined in each event step are technology and topics. technology tells what type of event bus to use. There's currently only one bus interface implemented by the synchronizer framework, and that is kafka. The topics member is a list of topics that will be listened on for events. The precise meaning of topics is left to the particular event bus technology that is in use.

Service-specific logic is implemented by overriding the process_event() method:


    def process_event(self, event):
        value = json.loads(event.value)
        first_name = value["first_name"]
        last_name = value["last_name"]

        # See if the object already exists
        objs = MyModel.filter(first_name=first_name, last_name=last_name)
        if objs:
            return

        # Create a new object
        obj = MyModel()
        obj.first_name = first_name
        obj.last_name = last_name
        obj.save(always_update_timestamp = True)

In this example we've made the assumption that the value of an event is a json-encoded dictionary containing the keys first_name and last_name. The event step in this case checks to see if an object with those fields already exists, and if not then it creates the object.

In this example, we've differed from the Pull Step example in that we omitted no_sync=True and we added always_update_timestamp = True to the save() call. This has the effect of causing the synchronizer to excute any sync steps that might exist for MyModel. Whether or not you want sync_steps to run is an implementation decision and depends upon the design of your synchronizer.

Sending an event to Kafka can be done using a variety of Kafka clients for various languages, Kafka command-line tools, etc. A python example is as follows:

import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="cord-kafka")
producer.send("MyTopic", json.dumps({"first_name": "John", "last_name": "Doe"}))
producer.flush()

results matching ""

    No results matching ""