Advanced Workflow Features
This article outlines advanced functionalities of Workflows.
Visit this GitHub repository for sample YAML Workflow code.
For full information on advanced uses of Mistral, please visit the Mistral documentation here.
View or Write YAML
To access the YAML for your Workflow in the UI, click the YAML tab. The YAML is the underlying code used to drive the Workflow, and will automatically update when changes are made within the graph view or other tabs.
For the YAML tab and other tabs, you can drag the center bar to increase or decrease the size of the window to have additional room to edit.
Some notes on writing YAML syntax:
- Tabs are replaced by 2 spaces.
- If you have nothing selected:
- Tab will tab to the next multiple of 2 spaces ("soft tabs").
- Shift-Tab will de-indent the line by 2 spaces.
- If you have something selected (whether it spans multiple lines or not):
- Tab will indent all the lines in the selection by 2 spaces.
- Shift-Tab will de-indent all the lines in the selection by 2 spaces.
- If you paste in some code that includes tabs, each tab will be replaced by 2 spaces.
When viewing the YAML for a previously executed workflow, the YAML presented is the historical record of what was run for that workflow, and is not editable.
Advanced Action Types
As Civis Workflows are designed to chain tasks you would otherwise manually execute within the Civis Platform, supported action types are limited to actions you would take within Platform. Each action corresponds to an API call, where required (and optional) request parameters are passed in under the input keyword.
The default action type for workflows is ‘civis.run_job’, but two other types can be used from the UI. Additional types may also be used directly from the YAML.
Note: We also support a subset of the default system actions, including:
- std.async_noop
- std.echo
- std.fail
- std.noop
Actions with UI Support
Selecting ‘civis.scripts.container’ within the info pane will require the user to enter the appropriate Git or Docker information to run their container script.
When clicked, the Git Repo URL will automatically list existing repositories the user has access to.
Below Git and Docker are Resources, which are automatically filled with the default values.
‘civis.scripts.custom’ will require the user to select the template they want to use. The dropdown will again act as a search bar.
Based on the template, the user may also have to add values for required parameters, as in this example.
All parameter types, such as credentials, are supported by this UI and will present the appropriate dropdown selection. (See Parameterized Workflows documentation for more information on parameters.)
YAML-only Actions
Using an existing workflow
You can execute another workflow from within your workflow. This can only be done via the YAML code configuration. Use the civis.workflows.execute action and specify the ID of the workflow you want to execute.
tasks:
execute_subworkflow:
action:civis.workflows.execute
input:
workflow_id: 56
This action also takes as inputs any of the other parameters from the specified workflow’s corresponding POST API endpoint. (API documentation)
Creating new jobs
You can also create new jobs directly via your workflow. This can only be done via the YAML code configuration. These actions take as inputs any of the parameters from their corresponding POST API endpoint. The currently supported actions are:
- civis.scripts.python3 (API documentation)
- civis.scripts.r (API documentation)
- civis.scripts.container (API documentation)
- civis.scripts.sql (API documentation)
- civis.scripts.javascript (API documentation)
- civis.scripts.dbt (API documentation)
- civis.scripts.custom (API documentation)
- civis.import (API documentation)
- civis.enhancements.cass_ncoa (API documentation)
Because one of these inputs is the code body block for scripts, it's possible to define the entire job inside a workflow.
tasks:
load_data:
action: civis.scripts.python3
input:
name: my python script
source: |
print('hello world')
YAMLComments
Comments in the YAML can be added by using the # before typing the comment into the code. Please note that YAML comments are not supported by the graphical user interface. Editing a workflow with comments through the GUI will cause the comments to be deleted. The graph of any workflow with YAML comments will be locked to prevent this from occurring accidentally, but we strongly encourage users to make use of the “Comments” field in the info pane rather than inline comments, as those comments will be preserved.
Pause a workflow
To pause a workflow, use the following:
version: '2.0' #you always need this key to specify version 2 of the mistral DSL
pause:
tasks:
python:
action: civis.python3_script
input:
name: python is the best
source: print("Hello from Python!")
# Here we tell mistral to pause the workflow once this task
# succeeds. Note that mistral launches the tasks in the order they
# are in the list. Thus the task `python_never_run` is never
# executed.
# You can resume the workflow via the POST /workflows/:id/executions/:execution_id/resume
# API endpoint.
on-success:
- pause
- python_never_run
python_never_run:
action: civis.python3_script
input:
name: python that is never run
source: print("You will never see this message!")
A workflow may also be paused by adding the "pause-before: true" attribute to a task in code mode or by adding a pause item to the on-success, on-complete, or on-error attributes for a task as shown above. To resume a paused workflow, you can use the “Resume Execution” button on the Execution page, or you can resume via an API endpoint:
https://platform.civisanalytics.com/#/api#v1_post_workflows_id_executions_execution_id_resume
Using run outputs
Civis script actions automatically register their outputs with the workflow. This means that you can reference the outputs of a script in other places in your workflow. For example, this will insert the file ID from the load_input task directly into the python code for the job.
log_output:
action: civis.scripts.python3
input:
name: log output script
source: |
import civis;
file_id = <% task(load_data).result.outputs[0].object_id %>
with open('downloaded_file.csv', 'w') as f:
civis.io.civis_to_file(file_id, f);
print(open('downloaded_file.csv', 'r').read());
Using anchors
YAML provides anchors and aliases to allow reuse of parts of your workflow definition. The anchor is defined by starting with an &, and referenced by *. For example, you could reuse your notification settings for each job as below:
task1:
action: civis.scripts.python3
input:
name: task 1
source: print('task 1')
notifications: &default_notifications
success-on: false
failure-on: false
failure-emails: john@acme.com
task2:
action: civis.scripts.python3
input:
name: task 2
source: print('task 2')
notifications: *default_notifications
Please note that the graphical user interface does not fully support anchors. As such, editing a workflow with YAML anchors using the GUI will cause the anchors to expand to include all the anchor code.
Reverse workflows
Civis also supports reverse workflows via the API. In contrast to direct workflows, each task specifies other tasks it depends on, instead of specifying what tasks follow. Reverse workflows are essentially dependency graphs.
workflow:
type: reverse
tasks:
task1:
action: civis.scripts.python3
input:
name: import data
source: print(‘importing data’)
task2:
action: civis.scripts.python3
input:
name: do science
source: print(‘doing science’)
requires:
- task1
Reverse workflows allow you to create executions that only run the tasks that a given “target task” depends on. When you execute them via the API, you must include the name of the target task: https://platform.civisanalytics.com/#/api#post_workflows_id_executions
Partial Execution via the API
To execute a partial workflow via the API, pass in an included_tasks argument to your execute function. The argument should include a comma-separated list of task names.
partial_execution_included_tasks = [“read_data”,“clean_data”]
client.workflows.post_executions(WORKFLOW_ID, included_tasks = partial_execution_included_tasks)
Join conditions
Join conditions are a way to control the flow of a workflow when a task has multiple upstream tasks. You can find the Mistral documentation on join conditions here.
We strongly recommend adding join conditions to a task with multiple upstream tasks. In fact, when connections are created via the Graph or Form in the UI such that a task now has multiple upstream tasks, we add a join: all condition to that task by default.
Valid join conditions
There are two categories of join conditions, Full Joins and Partial Joins.
Full Join
Full Joins use the all keyword. With join: all, the task will wait until all upstream conditions are met before running.
Example: Let’s say we have a task called “Task_1” with a join: all condition. If one of “Task_1”‘s upstream conditions are not met, e.g. the task “Import_data” fails instead of succeeds, “Task_1” will not execute.
workflow:
tasks:
Import_data:
action: civis.scripts.python3
input:
name: import data
source: print(‘importing data’)
on-success: Task_1
Import_other_data:
action: civis.run_job
input:
job_id: 1234
on-success: Task_1
Task_1:
join: all
action: civis.run_job
input:
job_id: 5678
Partial Join
Partial Joins are defined by integer join values, e.g. 1,2,3,4, etc. With join: n, the task will run when at least n upstream tasks are completed and corresponding conditions have triggered. It is possible for the task to be triggered more than once if it has a partial join.
Special case: Join on one. Mistral accepts either the string one or number 1 for this partial join case only. This is just syntax; join: one behaves like all other partial joins. Practically, join: one isn’t any different from omitting the join condition entirely, but sometimes it is nice to have for documentation, in order to note that only one leading task is required.
Comments
0 comments
Please sign in to leave a comment.