Transformations

Flycs lets you define BigQuery queries that will be run as BigQuery Operators during the DAG run in which the query is used. To define a query using the python SDK you must use a Transformation object.

from flycs_sdk.transformations import Transformation
query = Transformation(
    name="simple_copy",
    query="SELECT * FROM raw.alpha.employees.employees AS raw",
    version="1.0.0",
    static=True,
)

How to use the Flycs SDK to create parameterized queries ?

It can happen that you need to generate some queries with information that are dynamic, or you want to create a pipeline that uses the same query but on different table. To do all of these things, the easiest way is to leverage the power of the Flycs SDK.

In the following example we will show how you can define a pipeline that contains parameterized queries.

from datetime import datetime, timezone

from flycs_sdk.pipelines import Pipeline, PipelineKind
from flycs_sdk.entities import Entity
from flycs_sdk.transformations import Transformation

# define your list of parameters
parameters = [
    ("table1", "tables2"),
    ("table3", "tables4"),
    ("table5", "tables6"),
]

# generate the transformation for each parameter
transformations = []
for table1, table2 in parameters:
    query = Transformation(
        name="transformation_" + "_".join([table1, table2]),
         # Notice how we generate the content of the query using
         # the parameters define at the top of the file
        query=f"SELECT * FROM {table1} LEFT JOIN {table2}",
        version="1.0.0",
    )
    transformations.append(query)

# define the entity
entity = Entity(
    name="my_entity",
    version="1.0.0",
    stage_config={},
    transformations={},
)
# insert the transformations into the entity
for t in transformations:
    # stage_config is a dict that contains the stage name as key and a dictionnary of
    # transformation name and version as value
    entity.stage_config["staging"] = {t.name: t.version for t in transformations}
    # stage_config is a dict that contains the stage name as key and a dictionnary of
    # transformation name and transformation object as value
    entity.transformations["staging"] = {t.name: t for t in transformations}

# define the pipeline
my_pipeline = Pipeline(
    name="my_pipeline",
    version="1.0.0",
    schedule="10 10 * * *",
    entities=[entity],
    kind=PipelineKind.VANILLA,
    start_time=datetime.now(tz=timezone.utc),
)

# to be picked up by the framework, all the pipelines needs to be
# added to a variables called `pipelines`
pipelines = [my_pipeline]