Pipeline triggers

Flycs let you configure some triggers for your pipeline. A Trigger allow your pipeline to be triggered by an external event. The supported type of triggers are:

  • PubSub topic

  • Google Cloud Storage

  • Other pipeline

PubSub topic

This triggers creates a subscription to the topic and then waits for any message to come. One a message is received, the rest of the pipeline is executed.

The PubSub trigger has 2 property you can configure:

  • topic: The full path to a pubsub topic. The topic MUST exist before the pipeline with the trigger is executed.

  • subscription_project: Optional property that let you choose in which project the subscription to the topic will be created.

Here is an example Pipeline definition using the PubSub trigger:

name: demo
kind: vanilla
version: 1.0.0
entities:
- name: demo
    version: 1.0.0
    stage_config:
    - name: preamble
        versions: {}
    - name: staging
        versions:
        simple_copy: "1.0.0"
    - name: datalake
        versions:
        simple_copy: "1.0.0"
        history: "1.0.0"
    - name: data_warehouse
        versions:
        simple_copy: "1.0.0"
        manipulating_pii_fields: "1.0.0"
    - name: data_mart
        versions:
        simple_copy: "1.0.0"
        salary: "1.0.0"
start_time: "2021-01-01T00:00:00"
trigger:
    type: "pubsub"
    topic: "projects/my_project/topics/my_topic"
    subscription_project: my_other_project

The same pipeline defined using the Flycs SDK:

from datetime import datetime, timezone

from flycs_sdk.pipelines import Pipeline, PipelineKind
from flycs_sdk.triggers import PubSubTrigger
from .entities import entity # entities are define in another modules, we just import them here.

p = Pipeline(
    name="demo",
    version="1.0.0",
    entities=[entity],
    kind=PipelineKind.VANILLA,
    start_time=datetime.now(tz=timezone.utc),
    trigger=PubSubTrigger(topic="projects/my_project/topics/my_topic", subscription_project="my_other_project"),
)

Note how the schedule property is not required on the pipeline definition when you specify a trigger. The reason for this is that Flycs will automatically configure your pipeline to be schedule every seconds so that it is always running and waiting on the trigger event to occur.

Google Cloud Storage

The event sent from GCS can actually be of 3 types:

  • Trigger when an object exists: As soon as the watched object is created and for as long as the object exists, the DAG is triggered.

  • Trigger when an update change: When the update time of an object change, the DAG is triggered.

  • Trigger by watching a prefix in a bucket: As soon as the prefix or any object under this prefix exists, the DAG is triggered.

Here are the YAML example how to define these GSC triggers. For brevity , only the trigger block is shown here.

Object exist trigger:

trigger:
    type: "gcs_object_exist"
    bucket: "gcs-trigger"
    object: "subdir/my_object"
GCSObjectExistTrigger(
    bucket="gcs-trigger",
    object="subdir/my_object"
)

Object update trigger:

trigger:
    type: "gcs_object_change"
    bucket: "gcs-trigger"
    object: "subdir/my_object"
GCSObjectChangeTrigger(
    bucket="gcs-trigger",
    object="subdir/my_object"
)

Prefix watch trigger:

trigger:
    type: "gcs_watch_prefix"
    bucket: "gcs-trigger"
    prefix: "my_prefix"
GCSPrefixWatchTrigger(
    bucket="gcs-trigger",
    prefix="my_prefix"
)

Other pipeline

This type of trigger is a bit different from the other one because it does not involve an external event. Instead, the pipeline is triggered whenever another pipeline is done.

The way to configure this trigger is also a bit different, here is an example. Here we define 2 pipelines called master and child. Master is responsible to trigger child.

# Pipeline master
name: master
kind: vanilla
version: 1.0.0
entities:
    ... # removed for brevity
schedule: "10 10 * * *"
start_time: "2021-01-01T00:00:00"

# Pipeline child
name: child
kind: vanilla
version: 1.0.0
entities:
    ... # removed for brevity
schedule: "master_1.0.0" # this is where the magic happens, by specifying the name + version of another pipeline, this pipeline will be automatically triggered.
start_time: "2021-01-01T00:00:00"

Same example with the python SDK:

master = Pipeline(
    name="master",
    version="1.0.0",
    entities=[entity],
    kind=PipelineKind.VANILLA,
    start_time=datetime.now(tz=timezone.utc),
    schedule="10 10 * * *",
)

child = Pipeline(
    name="child",
    version="1.0.0",
    entities=[entity],
    kind=PipelineKind.VANILLA,
    start_time=datetime.now(tz=timezone.utc)
    schedule=master, # Here we pass the master Pipeline object directly into the `schedule` field.
)