Case 4: Customizing Table Names
This guide shows how to customize target table names (raw and qualify) when creating a pipeline in Dadosfera.
Why customize them? By default, the system generates names such as tb__xqfu8g__cadastros__pedidos. Custom naming lets you follow your organization’s conventions and organize data into schemas that represent data maturity.
Authentication
import requests
import json
import os
from pprint import pprint
BASE_URL = "https://maestro.dadosfera.ai"
response = requests.post(
f"{BASE_URL}/auth/sign-in",
data=json.dumps({
"username": os.environ['DADOSFERA_USERNAME'],
"password": os.environ["DADOSFERA_PASSWORD"]
}),
headers={"Content-Type": "application/json"},
)
headers = {
"Authorization": response.json()['tokens']['accessToken'],
"Content-Type": "application/json"
}
Default naming
When you do not specify custom names, the system generates them automatically:
tb__{base32_id}__{source_schema}__{source_table}
Example: tb__xqfu8g__cadastros__pedidos
| Component | Description |
|---|---|
tb__ | Default prefix |
xqfu8g | Unique identifier (first 6 characters of the pipeline ID in base32) |
cadastros | Source schema |
pedidos | Source table |
Default schemas by load_type:
full_load/incremental: table inPUBLICincremental_with_qualify: raw inPUBLIC, qualify inSTAGED
Step 1: Create a pipeline with the default name (reference)
First, let’s see how a pipeline looks without customization:
import uuid
pipeline_id = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id}")
payload = {
"id": pipeline_id,
"name": "Pedidos - Padrão",
"cron": "@once",
"description": "Pipeline com nomenclatura padrão",
"jobs": [
{
"input": {
"connector": "jdbc",
"plugin": "mysql",
"table_schema": "cadastros",
"table_name": "pedidos",
"load_type": "incremental_with_qualify",
"incremental_column_name": "updated_at",
"incremental_column_type": "timestamp",
"primary_keys": ["id"],
"column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
"auth_parameters": {
"auth_type": "connection_manager",
"config_id": "sua-connection-config-id"
}
},
"transformations": [],
"output": {
"plugin": "dadosfera_snowflake"
}
}
]
}
response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())
Result: the tables are created with automatically generated names:
- Raw:
PUBLIC.tb__xxxxxx__cadastros__pedidos - Qualify:
STAGED.tb__xxxxxx__cadastros__pedidos
Step 2: Create a pipeline with custom names
Now let’s create a pipeline with custom table names and schemas. The schema indicates the maturity level of the data:
RAW: raw data, without processingSTAGED: processed and deduplicated data
import uuid
pipeline_id_custom = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id_custom}")
payload = {
"id": pipeline_id_custom,
"name": "Pedidos - Customizado",
"cron": "@once",
"description": "Pipeline com nomenclatura personalizada",
"jobs": [
{
"input": {
"connector": "jdbc",
"plugin": "mysql",
"table_schema": "cadastros",
"table_name": "pedidos",
"load_type": "incremental_with_qualify",
"incremental_column_name": "updated_at",
"incremental_column_type": "timestamp",
"primary_keys": ["id"],
"column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
"auth_parameters": {
"auth_type": "connection_manager",
"config_id": "sua-connection-config-id"
}
},
"transformations": [],
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
}
]
}
response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())
Result: the tables are created with the custom names:
- Raw:
RAW.pedidos - Qualify:
STAGED.pedidos
The same table name (pedidos) can exist in different schemas. The schema indicates maturity: RAW contains raw data and STAGED contains deduplicated data.
Configuration options
Output structure
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
| Field | Description | Required |
|---|---|---|
raw.table_name | Raw table name | Yes (if using raw) |
raw.table_schema | Raw table schema | Yes (if using raw) |
qualify.table_name | Deduplicated table name | Yes (if using qualify) |
qualify.table_schema | Qualify table schema | Yes (if using qualify) |
Validation rules
qualifyis only allowed withincremental_with_qualify: ifload_typeisfull_loadorincremental, use onlyraw.- Both blocks are optional: you can customize only
raw, onlyqualify, both, or neither. - Conflict validation: before creating the pipeline, the system checks whether a table with the same name already exists for the customer.
Examples by load type
Full load (raw only)
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "clientes",
"table_schema": "RAW"
}
}
Incremental (raw only)
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "eventos",
"table_schema": "RAW"
}
}
Incremental with qualify (raw + qualify)
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
Table validation by customer
Dadosfera keeps track of all Snowflake tables by customer. Before creating a pipeline, the system validates the following:
| Validation | Description |
|---|---|
| Uniqueness | The schema.table_name combination cannot already exist for the same customer |
| Valid schema | The schema must exist in the customer’s Snowflake environment |
| Catalog conflict | The table cannot conflict with tables already registered in the catalog |
Conflict error (HTTP 409)
If you try to create a pipeline with a table name that already exists:
response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
print(response.status_code) # 409
pprint(response.json())
{
"status": false,
"exception_type": "RepositoryResourceAlreadyExists",
"traceback": "Table 'RAW.pedidos' already exists in catalog",
"data": null
}
Solution: choose a different table name or use a different schema.
Validation error (HTTP 400)
If you try to use qualify with an incompatible load_type:
# ERROR: qualify with full_load
payload = {
# ...
"jobs": [{
"input": {
"load_type": "full_load", # DOES NOT support qualify
# ...
},
"output": {
"plugin": "dadosfera_snowflake",
"qualify": { # INVALID for full_load
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
}]
}
{
"status": false,
"exception_type": "ValidationError",
"traceback": "Output 'qualify' configuration is only allowed when input load_type is 'incremental_with_qualify'",
"data": null
}
Solution: remove the qualify configuration or change the load_type to incremental_with_qualify.
Verify the created tables
After creating the pipeline, inspect the job to see the configured table names:
JOB_ID = f"{pipeline_id_custom}-0"
response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers)
job_config = response.json()
print("Output Config:")
pprint(job_config.get("output_config"))
{
"plugin": "dadosfera_snowflake",
"table_name": "tb__xxxxxx__cadastros__pedidos",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
Best practices
- Use schemas to represent maturity:
RAWfor raw data,STAGEDfor processed data. - Keep names simple: reusing the same table name in different schemas makes the data model easier to understand.
- Avoid special characters: use only letters, numbers, and underscores.
- Plan before creating: once created, that table name is reserved for the pipeline.
Summary
| Scenario | raw configuration | qualify configuration |
|---|---|---|
full_load | Optional | Not allowed |
incremental | Optional | Not allowed |
incremental_with_qualify | Optional | Optional |
With this feature, you can adapt Snowflake naming to your organization’s standards while preserving automatic validation and integration with Dadosfera’s catalog.