Examples of using Sling to incrementally load data from databases to databases
New Data Upsert
This mode performs incremental loading by only processing new/updated records based on an update key. It requires both a primary key and update key.
source: postgres
target: snowflake
defaults:
mode: incremental
primary_key: id
update_key: updated_at
object: new_schema.{stream_schema}_{stream_table}
streams:
public.orders:
# Will only load records where updated_at is greater than the max value in target
public.customers:
primary_key: [customer_id] # Override default primary key
update_key: last_modified # Override default update key
Full Data Upsert
This mode performs incremental loading by processing the full source dataset and upserting records based on the primary key. No update key is required.
source: postgres
target: snowflake
defaults:
mode: incremental
primary_key: id
object: new_schema.{stream_schema}_{stream_table}
streams:
public.products:
# Will load all records and upsert based on id
public.categories:
primary_key: [category_id, region] # Composite primary key
Append Only
This mode performs incremental loading by only appending new records based on an update key, without updating existing records. No primary key is required.
source: postgres
target: snowflake
defaults:
mode: incremental
update_key: created_at
object: new_schema.{stream_schema}_{stream_table}
streams:
public.events:
# Will only append records where created_at is greater than max value in target
public.logs:
update_key: timestamp # Override default update key
Custom SQL
source: postgres
target: snowflake
defaults:
mode: incremental
primary_key: id
update_key: modified_at
object: new_schema.{stream_schema}_{stream_table}
streams:
public.orders:
sql: |
select * from public.orders
where {incremental_where_cond}
order by modified_at asc
public.customers:
sql: |
with ranked_customers as (
select *
from public.customers
where modified_at > coalesce({incremental_value}, '2001-01-01')
)
select * from ranked_customers where rn = 1
Using SLING_STATE
Here is an example, where sling will store the incremental values in the my/state path, in the AWS_S3 connection:
When loading data incrementally, you may want to handle records that exist in the target but are missing from the source. The delete_missing option supports two modes:
hard: Physically deletes records from the target table that no longer exist in the source
soft: Marks records as deleted in the target table by setting a deletion timestamp
Careful not to enable this feature on massive tables. The primary key column(s) is fully selected from the source stream each run in order to determine which records don't exist anymore.
Hard Delete Example
source: MY_POSTGRES
target: MY_SNOWFLAKE
defaults:
mode: incremental
update_key: updated_at
primary_key: id # primary key is required for delete_missing
streams:
finance.accounts:
object: finance.accounts_target
target_options:
delete_missing: hard # will remove records that don't exist in source
Soft Delete Example
source: MY_POSTGRES
target: MY_SNOWFLAKE
defaults:
mode: incremental
update_key: updated_at
primary_key: id # primary key is required for delete_missing
target_options:
delete_missing: soft # will mark records as deleted with timestamp
streams:
finance.accounts:
object: finance.accounts_target
When using soft delete mode, Sling will add a _sling_deleted_at timestamp column to track when records were marked as deleted.
Important Notes:
The delete_missing option requires that you specify a primary_key to uniquely identify records
For incremental loads, an update_key is also required to determine which records to process
The comparison is done using a temporary table to efficiently identify missing records
This mode allows using custom SQL queries with incremental loading by using special variables that Sling will replace at runtime. See from more details.
If we wish to store the incremental state externally (and avoid using the max value of the target table), we can use the state feature. We need to provide an environment variable called SLING_STATE, which is a location where sling will store the respective incremental values. See for more details.