Skip to main content

Spark Connect em Data Apps | Processar | Documentação

Overview

Este guia demonstra como utilizar o Spark Connect em Data Apps Python (Streamlit, Flask, FastAPI) na plataforma Dadosfera para processamento distribuído de dados.

Configuração do Projeto

Dependências (requirements.txt)

"pyspark>4.0.0"
"numpy<2"
pandas>=2.0.0
pyarrow>=14.0.0
grpcio>=1.60.0
grpcio-status>=1.60.0
protobuf>=4.25.0
streamlit>=1.30.0
snowflake-connector-python>=3.6.0

Módulo de Conexão Reutilizável

spark_utils.py

Crie um módulo utilitário para gerenciar conexões:

"""
Utilitários para conexão com Spark Connect
"""
from contextlib import contextmanager
from pyspark.sql import SparkSession
import logging

logger = logging.getLogger(__name__)

SPARK_CONNECT_URL = "sc://spark-connect.spark-jobs.svc.cluster.local:15002"


@contextmanager
def get_spark_session():
"""
Context manager para sessão Spark Connect.

Uso:
with get_spark_session() as spark:
df = spark.range(100)
df.show()
"""
spark = None
try:
logger.info("Conectando ao Spark Connect...")
spark = SparkSession.builder \
.remote(SPARK_CONNECT_URL) \
.getOrCreate()
logger.info(f"Conectado! Spark version: {spark.version}")
yield spark
except Exception as e:
logger.error(f"Erro na conexão Spark: {e}")
raise
finally:
if spark:
spark.stop()
logger.info("Sessão Spark encerrada")


def create_spark_session() -> SparkSession:
"""
Cria sessão Spark Connect.
Lembre-se de chamar spark.stop() quando terminar.
"""
return SparkSession.builder \
.remote(SPARK_CONNECT_URL) \
.getOrCreate()

Exemplo: Data App com Streamlit

app.py

import streamlit as st
import pandas as pd
from spark_utils import get_spark_session
from pyspark.sql.functions import col, sum as spark_sum, avg, count

st.set_page_config(page_title="Analytics com Spark", layout="wide")

st.title("Dashboard de Analytics")
st.markdown("Processamento distribuído com Spark Connect")


@st.cache_data(ttl=300)
def processar_dados_spark(query_params: dict) -> pd.DataFrame:
"""
Processa dados usando Spark Connect.
Resultados são cacheados por 5 minutos.
"""
with get_spark_session() as spark:
# Simular dados (substitua pela sua fonte real)
dados = []
for i in range(100000):
dados.append((
f"Produto_{i % 100}",
f"Categoria_{i % 10}",
(i % 1000) * 10.5,
f"2024-{(i % 12) + 1:02d}-01"
))

df = spark.createDataFrame(
dados,
["produto", "categoria", "valor", "data"]
)

# Aplicar filtros
if query_params.get("categoria"):
df = df.filter(col("categoria") == query_params["categoria"])

if query_params.get("valor_minimo"):
df = df.filter(col("valor") >= query_params["valor_minimo"])

# Agregar
resultado = df.groupBy("categoria").agg(
spark_sum("valor").alias("total"),
avg("valor").alias("media"),
count("*").alias("quantidade")
).orderBy(col("total").desc())

return resultado.toPandas()


# Interface do usuário
st.sidebar.header("Filtros")

categoria = st.sidebar.selectbox(
"Categoria",
options=["Todas"] + [f"Categoria_{i}" for i in range(10)]
)

valor_minimo = st.sidebar.slider(
"Valor Mínimo",
min_value=0,
max_value=10000,
value=0
)

# Processar dados
params = {
"categoria": None if categoria == "Todas" else categoria,
"valor_minimo": valor_minimo
}

if st.button("Processar Dados"):
with st.spinner("Processando com Spark..."):
df_resultado = processar_dados_spark(params)

# Exibir resultados
col1, col2, col3 = st.columns(3)

with col1:
st.metric("Total Geral", f"R$ {df_resultado['total'].sum():,.2f}")

with col2:
st.metric("Média Geral", f"R$ {df_resultado['media'].mean():,.2f}")

with col3:
st.metric("Transações", f"{df_resultado['quantidade'].sum():,}")

st.subheader("Dados por Categoria")
st.dataframe(df_resultado, use_container_width=True)

st.bar_chart(df_resultado.set_index("categoria")["total"])

Exemplo: API com FastAPI

main.py

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import pandas as pd
from spark_utils import get_spark_session
from pyspark.sql.functions import col, sum as spark_sum
from pyspark import Row

app = FastAPI(title="Spark Processing API")


class ProcessingRequest(BaseModel):
categoria: Optional[str] = None
data_inicio: Optional[str] = None
data_fim: Optional[str] = None
agrupar_por: List[str] = ["categoria"]


class ProcessingResult(BaseModel):
status: str
registros_processados: int
resultado: List[dict]


@app.post("/processar", response_model=ProcessingResult)
async def processar_dados(request: ProcessingRequest):
"""
Endpoint para processamento de dados com Spark Connect.
"""
try:
with get_spark_session() as spark:
# Carregar dados (substitua pela sua fonte)
# Criar DataFrame de exemplo
data = [
Row(categoria="eletrônicos", data="2024-01-15", valor=1500.00),
Row(categoria="roupas", data="2024-01-20", valor=250.00),
Row(categoria="eletrônicos", data="2024-02-10", valor=3200.00),
Row(categoria="alimentos", data="2024-02-15", valor=89.90),
Row(categoria="roupas", data="2024-03-01", valor=450.00),
]
df = spark.createDataFrame(data)
# Aplicar filtros
if request.categoria:
df = df.filter(col("categoria") == request.categoria)

if request.data_inicio:
df = df.filter(col("data") >= request.data_inicio)

if request.data_fim:
df = df.filter(col("data") <= request.data_fim)

# Contar registros antes da agregação
total_registros = df.count()

# Agregar
resultado = df.groupBy(*request.agrupar_por).agg(
spark_sum("valor").alias("total")
)

# Converter para lista de dicts
resultado_list = resultado.toPandas().to_dict(orient="records")

return ProcessingResult(
status="success",
registros_processados=total_registros,
resultado=resultado_list
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@app.get("/health")
async def health_check():
"""Verifica se o serviço está funcionando."""
return {"status": "healthy"}


@app.get("/spark/status")
async def spark_status():
"""Verifica conexão com Spark Connect."""
try:
with get_spark_session() as spark:
version = spark.version
return {
"status": "connected",
"spark_version": version
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}

Boas Práticas para Data Apps

1. Use Caching

@st.cache_data(ttl=300)  # Cache por 5 minutos
def processar_com_spark(params):
with get_spark_session() as spark:
# Processamento...
return resultado.toPandas()

2. Tratamento de Erros

try:
with get_spark_session() as spark:
resultado = processar(spark)
except Exception as e:
st.error(f"Erro no processamento: {e}")
logger.exception("Erro Spark")

3. Feedback ao Usuário

with st.spinner("Processando dados..."):
progress = st.progress(0)
# ... processamento
progress.progress(100)

4. Limite Resultados para UI

# Sempre limite dados retornados para a interface
resultado = spark_df.limit(10000).toPandas()