Skip to main content

Case 4: Customizing Table Names

This guide shows how to customize target table names (raw and qualify) when creating a pipeline in Dadosfera.

Why customize them? By default, the system generates names such as tb__xqfu8g__cadastros__pedidos. Custom naming lets you follow your organization’s conventions and organize data into schemas that represent data maturity.

Authentication

import requests
import json
import os
from pprint import pprint

BASE_URL = "https://maestro.dadosfera.ai"

response = requests.post(
f"{BASE_URL}/auth/sign-in",
data=json.dumps({
"username": os.environ['DADOSFERA_USERNAME'],
"password": os.environ["DADOSFERA_PASSWORD"]
}),
headers={"Content-Type": "application/json"},
)

headers = {
"Authorization": response.json()['tokens']['accessToken'],
"Content-Type": "application/json"
}

Default naming

When you do not specify custom names, the system generates them automatically:

tb__{base32_id}__{source_schema}__{source_table}

Example: tb__xqfu8g__cadastros__pedidos

ComponentDescription
tb__Default prefix
xqfu8gUnique identifier (first 6 characters of the pipeline ID in base32)
cadastrosSource schema
pedidosSource table

Default schemas by load_type:

  • full_load / incremental: table in PUBLIC
  • incremental_with_qualify: raw in PUBLIC, qualify in STAGED

Step 1: Create a pipeline with the default name (reference)

First, let’s see how a pipeline looks without customization:

import uuid
pipeline_id = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id}")
payload = {
"id": pipeline_id,
"name": "Pedidos - Padrão",
"cron": "@once",
"description": "Pipeline com nomenclatura padrão",
"jobs": [
{
"input": {
"connector": "jdbc",
"plugin": "mysql",
"table_schema": "cadastros",
"table_name": "pedidos",
"load_type": "incremental_with_qualify",
"incremental_column_name": "updated_at",
"incremental_column_type": "timestamp",
"primary_keys": ["id"],
"column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
"auth_parameters": {
"auth_type": "connection_manager",
"config_id": "sua-connection-config-id"
}
},
"transformations": [],
"output": {
"plugin": "dadosfera_snowflake"
}
}
]
}

response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())

Result: the tables are created with automatically generated names:

  • Raw: PUBLIC.tb__xxxxxx__cadastros__pedidos
  • Qualify: STAGED.tb__xxxxxx__cadastros__pedidos

Step 2: Create a pipeline with custom names

Now let’s create a pipeline with custom table names and schemas. The schema indicates the maturity level of the data:

  • RAW: raw data, without processing
  • STAGED: processed and deduplicated data
import uuid
pipeline_id_custom = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id_custom}")
payload = {
"id": pipeline_id_custom,
"name": "Pedidos - Customizado",
"cron": "@once",
"description": "Pipeline com nomenclatura personalizada",
"jobs": [
{
"input": {
"connector": "jdbc",
"plugin": "mysql",
"table_schema": "cadastros",
"table_name": "pedidos",
"load_type": "incremental_with_qualify",
"incremental_column_name": "updated_at",
"incremental_column_type": "timestamp",
"primary_keys": ["id"],
"column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
"auth_parameters": {
"auth_type": "connection_manager",
"config_id": "sua-connection-config-id"
}
},
"transformations": [],
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
}
]
}

response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())

Result: the tables are created with the custom names:

  • Raw: RAW.pedidos
  • Qualify: STAGED.pedidos

The same table name (pedidos) can exist in different schemas. The schema indicates maturity: RAW contains raw data and STAGED contains deduplicated data.

Configuration options

Output structure

"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
FieldDescriptionRequired
raw.table_nameRaw table nameYes (if using raw)
raw.table_schemaRaw table schemaYes (if using raw)
qualify.table_nameDeduplicated table nameYes (if using qualify)
qualify.table_schemaQualify table schemaYes (if using qualify)

Validation rules

  1. qualify is only allowed with incremental_with_qualify: if load_type is full_load or incremental, use only raw.
  2. Both blocks are optional: you can customize only raw, only qualify, both, or neither.
  3. Conflict validation: before creating the pipeline, the system checks whether a table with the same name already exists for the customer.

Examples by load type

Full load (raw only)

"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "clientes",
"table_schema": "RAW"
}
}

Incremental (raw only)

"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "eventos",
"table_schema": "RAW"
}
}

Incremental with qualify (raw + qualify)

"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}

Table validation by customer

Dadosfera keeps track of all Snowflake tables by customer. Before creating a pipeline, the system validates the following:

ValidationDescription
UniquenessThe schema.table_name combination cannot already exist for the same customer
Valid schemaThe schema must exist in the customer’s Snowflake environment
Catalog conflictThe table cannot conflict with tables already registered in the catalog

Conflict error (HTTP 409)

If you try to create a pipeline with a table name that already exists:

response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
print(response.status_code) # 409
pprint(response.json())
{
"status": false,
"exception_type": "RepositoryResourceAlreadyExists",
"traceback": "Table 'RAW.pedidos' already exists in catalog",
"data": null
}

Solution: choose a different table name or use a different schema.

Validation error (HTTP 400)

If you try to use qualify with an incompatible load_type:

# ERROR: qualify with full_load
payload = {
# ...
"jobs": [{
"input": {
"load_type": "full_load", # DOES NOT support qualify
# ...
},
"output": {
"plugin": "dadosfera_snowflake",
"qualify": { # INVALID for full_load
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
}]
}
{
"status": false,
"exception_type": "ValidationError",
"traceback": "Output 'qualify' configuration is only allowed when input load_type is 'incremental_with_qualify'",
"data": null
}

Solution: remove the qualify configuration or change the load_type to incremental_with_qualify.

Verify the created tables

After creating the pipeline, inspect the job to see the configured table names:

JOB_ID = f"{pipeline_id_custom}-0"

response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers)
job_config = response.json()

print("Output Config:")
pprint(job_config.get("output_config"))
{
"plugin": "dadosfera_snowflake",
"table_name": "tb__xxxxxx__cadastros__pedidos",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}

Best practices

  1. Use schemas to represent maturity: RAW for raw data, STAGED for processed data.
  2. Keep names simple: reusing the same table name in different schemas makes the data model easier to understand.
  3. Avoid special characters: use only letters, numbers, and underscores.
  4. Plan before creating: once created, that table name is reserved for the pipeline.

Summary

Scenarioraw configurationqualify configuration
full_loadOptionalNot allowed
incrementalOptionalNot allowed
incremental_with_qualifyOptionalOptional

With this feature, you can adapt Snowflake naming to your organization’s standards while preserving automatic validation and integration with Dadosfera’s catalog.