Skip to main content

Case 2: Migrating from Incremental to Incremental with Qualify

This guide shows how to migrate an incremental pipeline to incremental_with_qualify, which adds automatic primary-key deduplication.

Problem: incremental pipelines capture every version of a record, generating duplicates in the target.

Solution: migrate to incremental_with_qualify, which keeps only the most recent version of each record.

Authentication

import requests
import json
import os
from pprint import pprint

BASE_URL = "https://maestro.dadosfera.ai"

response = requests.post(
f"{BASE_URL}/auth/sign-in",
data=json.dumps({
"username": os.environ['DADOSFERA_USERNAME'],
"password": os.environ["DADOSFERA_PASSWORD"]
}),
headers={"Content-Type": "application/json"},
)

headers = {
"Authorization": response.json()['tokens']['accessToken'],
"Content-Type": "application/json"
}

Step 1: Identify the pipeline and job

We will use the same pipeline created in Case 1.

PIPELINE_ID = "7b8b3399-b8c0-4fc5-9116-142cd9f4e7ae"
JOB_ID = "7b8b3399-b8c0-4fc5-9116-142cd9f4e7ae-0"
response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers)
job_config = response.json()

pprint(f"\nCurrent load_type: {job_config.get('source_config', {}).get('load_type')}")
pprint(f"primary_keys: {job_config.get('source_config', {}).get('primary_keys', 'NOT DEFINED')}")
'Current load_type: incremental'
'primary_keys: None'

Step 2: Migrate the sync mode

The POST /platform/jobs/jdbc/{jobId}/sync-mode endpoint performs the migration with full validation.

Required parameters:

  • target_load_type: the new sync mode.
  • primary_keys: the columns that uniquely identify each record.
  • incremental_column_name and incremental_column_type: the incremental control column.
payload = {
"target_load_type": "incremental_with_qualify",
"incremental_column_name": "updated_at",
"incremental_column_type": "timestamp",
"primary_keys": ["id"]
}

response = requests.post(
f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}/sync-mode",
headers=headers,
json=payload
)
pprint(response.json())
{
"message": "Successfully migrated sync mode to incremental_with_qualify",
"migration_details": {
"state_action": "preserve",
"target_load_type": "incremental_with_qualify",
"warnings": []
},
"status": "success"
}

Step 3: Verify the change

The state_action: "preserve" value means that the incremental state was kept and the next execution will continue from the same point.

response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers)
job_config = response.json()

print(f"load_type: {job_config.get('source_config', {}).get('load_type')}")
print(f"primary_keys: {job_config.get('source_config', {}).get('primary_keys')}")
load_type: incremental_with_qualify
primary_keys: ['id']

Step 4: Execute the pipeline

payload = {"pipeline_id": PIPELINE_ID}

response = requests.post(
f"{BASE_URL}/platform/pipeline/execute",
headers=headers,
json=payload
)
pprint(response.json())

Result

incremental_with_qualify result

With incremental_with_qualify, a new table is created in the STAGED schema containing only the most recent version of each record. Both the schema and table names can be customized through the API.

Results:

  1. IDs are deduplicated, with only one record per id.
  2. The new telefone column is now present in the data.