Usage

Getting started

The simplest way to use the SDK is to define your pipeline, entities and transformations into the same python file:

from datetime import datetime, timezone

from flycs_sdk.entities import Entity
from flycs_sdk.pipelines import Pipeline, PipelineKind

# The first step is to define entities using the Flycs SDK
stage_config = {
    "raw": {"table_1": "1.0.0", "table_2": "1.0.0"},
    "staging": {"table_3": "1.0.0", "table_4": "1.0.0"},
    "data_warehouse": {"table_5": "1.1.0"},
}
entity1 = Entity(name="entity1", version="1.0.0", stage_config=stage_config)

# Once the entities are defined, we can create pipelines.
p1 = Pipeline(
    name="my_pipeline",
    version="1.0.0",
    schedule="* 12 * * *",  # this is using cron notation
    entities=[entity1],
    kind=PipelineKind.VANILLA,
    start_time=datetime.now(tz=timezone.utc),
)

# To be able to be discovered, the pipelines needs to be aggregated into a list called 'pipelines' located at the root of the module.
# make the pipelines available to be discovered by the rest of the Flycs ecosystem
pipelines = [p1]

While this is very easy to use, usually you will want to put a bit more structure into the different types of object you create. For example we could image to have a file per pipeline and one file per entity. This layout would look like:

|── pipelines
│   ├── entity.py
│   ├── __init__.py
│   └── pipeline.py

Let’s go over the content of each file:

  • entity.py: in this file we define one entity.

from flycs_sdk.entities import Entity

stage_config = {
    "raw": {"table_1": "1.0.0", "table_2": "1.0.0"},
    "staging": {"table_3": "1.0.0", "table_4": "1.0.0"},
    "data_warehouse": {"table_5": "1.1.0"},
}
entity = Entity("entity", "1.0.0", stage_config)
  • pipeline.py: In this file we define one pipeline and import the entity defined in the entity.py file

from datetime import datetime, timezone
from flycs_sdk.pipelines import Pipeline, PipelineKind

from .entity import entity

my_pipeline = Pipeline(
    name="my_pipeline",
    version="1.0.0",
    schedule="* 12 * * *",  # this is using cron notation
    entities=[entity],
    kind=PipelineKind.VANILLA,
    start_time=datetime.now(tz=timezone.utc),
)
  • __init__.py: In this file, we create the pipelines list and import the pipelines define in pipeline.py.

from .pipeline import my_pipeline
pipelines = [my_pipeline]

How to use parametrized pipeline to keep the code DRY ?

It can happens that you end up with a lot of pipelines that looks nearly exactly the same. To avoid this, the SDK offers the ParametrizePipeline and ParametrizeEntity class. With it, you can pass some parameters to your pipeline. The SDK would then generate automatically a new pipeline for each possible combination of each parameters.

A set of parameters looks like this:

pipeline_parameters = {
    "language": ["nl", "fr"],
    "country": ["be", "en"],
}

Such a parameters would generate 4 pipelines, one for each possible combination of parameter:

{"language": "nl", "country": "be"},
{"language": "nl", "country": "en"},
{"language": "fr", "country": "be"},
{"language": "fr", "country": "en"},

Parameterized pipeline and entity also allow to introduce custom logic. Here is an example how to use it:

from datetime import datetime, timezone

from flycs_sdk.entities import ParametrizedEntity
from flycs_sdk.pipelines import ParametrizedPipeline, PipelineKind

# To leverage the power of the ParametrizedPipeline, create a new class that inherits the ParametrizedPipeline class


class MyPipeline(ParametrizedPipeline):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    # You can overwrite the `schedule` and `start_time` property to introduce custom logic
    # in your pipeline
    # Here we return a different schedule time in the case the value of the parameters `language` is equal to `fr`
    @property
    def schedule(self):
        if self.parameters["language"] == "fr":
            return "* 1 * * *"
        else:
            return "* 12 * * *"


# It is also possible to customize the behavior of the entities.
# To do so, create a class that inherits of the ParametrizedEntity class and overwrite the `get_stage_versions` method.


class MyEntity(ParametrizedEntity):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    # here we introduce a custom behavior when the state is "staging" and the value of the parameters `language` is equal to `fr`
    def get_stage_versions(self, stage, parameters):
        if "stage" == "staging" and parameters["language"] == "fr":
            return {"table_3": "1.1.0", "table_4": "2.0.0"}
        else:
            return self.stage_config[stage]


# Once you have your new classes defined, you can create the objects normally
stage_config = {
    "raw": {"table_1": "1.0.0", "table_2": "1.0.0"},
    "staging": {"table_3": "1.0.0", "table_4": "1.0.0"},
    "data_warehouse": {"table_5": "1.1.0"},
}
entity1 = MyEntity("entity1", "1.0.0", stage_config)

# Once the entities are defined, we can create pipelines.
p1 = MyPipeline(
    name="my_pipeline",
    version="1.0.0",
    schedule="* 12 * * *",
    entities=[entity1],
    kind=PipelineKind.VANILLA,
    start_time=datetime.now(tz=timezone.utc),
    parameters={
        "language": ["fr", "en"],
        "country": {"be", "nl"},
    },  # this is an extra argument compare to the normal Pipeline class
)

# make the pipelines available to be discovered by the rest of the Flycs ecosystem
pipelines = [p1]