Replication
Replication hooks allow you to trigger nested replication tasks within your workflow. This is particularly useful for orchestrating complex data pipelines, running dependent replications, or managing multi-step data transformations.
Configuration
- type: replication
path: "path/to/replication.yaml" # Required: Path to replication configuration file
mode: "full-refresh" # Optional: Override replication mode
range: "2021-01-01,2022-01-01" # Optional: Override backfill range for incremental mode
streams: ["stream1", "stream2"] # Optional: List of specific streams to run
env: # Optional: Environment variables for the replication
ENV_VAR1: "value1"
ENV_VAR2: "value2"
on_failure: abort # Optional: abort/warn/quiet/skip
id: my_id # Optional. Will be generated. Use `log` hook with {runtime_state} to view state.
Properties
path
Yes
Path to the replication configuration file
mode
No
Override the replication mode (full/incremental)
range
No
Override the backfill range for incremental mode
streams
No
List of specific streams to run. If not provided, all streams will be run
env
No
Map of environment variables to set for the replication
on_failure
No
What to do if the replication fails (abort/warn/quiet/skip)
Output
When the replication hook executes successfully, it returns the following output that can be accessed in subsequent hooks:
status: success # Status of the hook execution
path: "path/to/replication.yaml" # The replication configuration file path
range: "2021-01-01,2022-01-01" # The backfill range used (if specified)
streams: ["stream1", "stream2"] # The streams that were processed
You can access these values in subsequent hooks using the following syntax (jmespath
):
{state.hook_id.status}
- Status of the hook execution{state.hook_id.path}
- The replication configuration file path{state.hook_id.range}
- The backfill range used{state.hook_id.streams}
- The streams that were processed
Examples
Run Dependent Replication
Execute a dependent replication after successful processing:
hooks:
post:
- type: replication
if: run.status == "success"
path: "configs/dependent_replication.yaml"
on_failure: abort
Environment-Specific Replication
Run different replication configurations based on environment:
hooks:
post:
- type: replication
path: "configs/{target.environment}/transform.yaml"
streams: ["{run.stream.name}_processed"]
env:
SOURCE_SCHEMA: "{run.stream.schema}"
TARGET_TABLE: "{run.object.name}_final"
on_failure: warn
Conditional Stream Selection
Select streams based on runtime conditions:
hooks:
post:
- type: replication
path: "configs/conditional_replication.yaml"
if: run.total_rows > 1000
streams:
- "{run.stream.name}_analytics"
- "{run.stream.name}_archive"
env:
PROCESS_DATE: "{timestamp.date}"
SOURCE_TABLE: "{run.object.full_name}"
Last updated