Pipeline triggers¶
Flycs let you configure some triggers for your pipeline. A Trigger allow your pipeline to be triggered by an external event. The supported type of triggers are:
PubSub topic
Google Cloud Storage
Other pipeline
Multiple Other pipelines
PubSub topic¶
This triggers creates a subscription to the topic and then waits for any message to come. One a message is received, the rest of the pipeline is executed.
The PubSub trigger has 2 properties you can configure:
topic: The full path to a pubsub topic. The topic MUST exist before the pipeline with the trigger is executed.
subscription_project: Optional property that let you choose in which project the subscription to the topic will be created.
Here is an example Pipeline definition using the PubSub trigger:
name: demo
kind: vanilla
version: 1.0.0
entities:
- name: demo
version: 1.0.0
stage_config:
- name: preamble
versions: {}
- name: staging
versions:
simple_copy: "1.0.0"
- name: datalake
versions:
simple_copy: "1.0.0"
history: "1.0.0"
- name: data_warehouse
versions:
simple_copy: "1.0.0"
manipulating_pii_fields: "1.0.0"
- name: data_mart
versions:
simple_copy: "1.0.0"
salary: "1.0.0"
start_time: "2021-01-01T00:00:00"
trigger:
type: "pubsub"
topic: "projects/my_project/topics/my_topic"
subscription_project: my_other_project
The same pipeline defined using the Flycs SDK:
from datetime import datetime, timezone
from flycs_sdk.pipelines import Pipeline, PipelineKind
from flycs_sdk.triggers import PubSubTrigger
from .entities import entity # entities are define in another modules, we just import them here.
p = Pipeline(
name="demo",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=datetime.now(tz=timezone.utc),
trigger=PubSubTrigger(topic="projects/my_project/topics/my_topic", subscription_project="my_other_project"),
)
Note how the schedule property is not required on the pipeline definition when you specify a trigger. The reason for this is that Flycs will automatically configure your pipeline to be schedule every seconds so that it is always running and waiting on the trigger event to occur.
Google Cloud Storage¶
The event sent from GCS can actually be of 3 types:
Trigger when an object exists: As soon as the watched object is created and for as long as the object exists, the DAG is triggered.
Trigger when an update change: When the update time of an object change, the DAG is triggered.
Trigger by watching a prefix in a bucket: As soon as the prefix or any object under this prefix exists, the DAG is triggered.
Here are the YAML example how to define these GSC triggers. For brevity , only the trigger block is shown here.
Object exist trigger:
trigger:
type: "gcs_object_exist"
bucket: "gcs-trigger"
object: "subdir/my_object"
GCSObjectExistTrigger(
bucket="gcs-trigger",
object="subdir/my_object"
)
Object update trigger:
trigger:
type: "gcs_object_change"
bucket: "gcs-trigger"
object: "subdir/my_object"
GCSObjectChangeTrigger(
bucket="gcs-trigger",
object="subdir/my_object"
)
Prefix watch trigger:
trigger:
type: "gcs_watch_prefix"
bucket: "gcs-trigger"
prefix: "my_prefix"
GCSPrefixWatchTrigger(
bucket="gcs-trigger",
prefix="my_prefix"
)
Other pipeline¶
This type of trigger is a bit different from the other one because it does not involve an external event. Instead, the pipeline is triggered whenever another pipeline is done.
The way to configure this trigger is also a bit different, here is an example. Here we define 2 pipelines called master and child. Master is responsible to trigger child.
# Pipeline master
name: master
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: "10 10 * * *"
start_time: "2021-01-01T00:00:00"
# Pipeline child
name: child
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: "master_1.0.0" # this is where the magic happens, by specifying the name + version of another pipeline, this pipeline will be automatically triggered.
start_time: "2021-01-01T00:00:00"
Same example with the python SDK:
master = Pipeline(
name="master",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=datetime.now(tz=timezone.utc),
schedule="10 10 * * *",
)
child = Pipeline(
name="child",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=datetime.now(tz=timezone.utc)
schedule=master, # Here we pass the master Pipeline object directly into the `schedule` field.
)
Multiple Other pipelines¶
This trigger is used when you want a child pipeline to be triggered by multiple parents pipelines. Using this trigger will actually used sensor on the child pipeline to wait for all the parents pipelines to finish.
This feature use ExternalSensor on the child pipeline. The schedule period of it will be automatically computed to run at the wider cron job scheduled of the list of the parents pipelines.
E.g : Having pipeline_A running 30 9 * * * (every day UTC at 9.30) and pipeline_B running */30 * * * * (every 30 minutes) with a child pipeline_C depending on both A and B. Then, the pipeline_C will be scheduled with the cron configuration 30 9 * * *.
Using yaml definition :
# parent pipeline_A
name: pipeline_A
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: "30 9 * * *"
start_time: "2021-01-01T00:00:00"
# parent pipeline_B
name: pipeline_B
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: "*/30 * * * *"
start_time: "2021-01-01T00:00:00"
# child pipeline_C
name: pipeline_C
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: ["pipeline_A","pipeline_B"]
start_time: "2021-01-01T00:00:00" # /!\ start_time of all the parents pipeline (A & B) must be the same as the pipeline_C
Using python pdk :
now_datetime = datetime.now(tz=timezone.utc)
pipeline_A = Pipeline(
name="pipeline_A",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=now_datetime,
schedule="30 9 * * *",
)
pipeline_B = Pipeline(
name="pipeline_B",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=now_datetime,
schedule="*/30 * * * *",
)
pipeline_C = Pipeline(
name="pipeline_C",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=now_datetime,
schedule=[pipeline_A, pipeline_B], # Here we pass the list of parents pipelines objects.
)