DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Big Data

Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.

icon
Latest Premium Content
Trend Report
Cognitive Databases, Intelligent Data
Cognitive Databases, Intelligent Data
Refcard #269
Getting Started With Data Quality
Getting Started With Data Quality
Refcard #254
Apache Kafka Essentials
Apache Kafka Essentials

DZone's Featured Big Data Resources

Fine-Tuning LLMs at Scale With Databricks MLflow and Spark

Fine-Tuning LLMs at Scale With Databricks MLflow and Spark

By Jubin Abhishek Soni DZone Core CORE
Why Fine-Tune on Databricks? General-purpose LLMs like Llama 3, Mistral, or Falcon are impressive out of the box — but they underperform on domain-specific tasks: medical coding, legal clause extraction, internal support ticket classification, and financial report summarization. Fine-tuning adapts a pre-trained model's weights to your domain using your proprietary labeled data. Doing this at scale introduces real engineering challenges: Training data lives in Delta Lake across dozens of tablesGPU clusters need to be orchestrated, not hand-managedExperiment tracking must be reproducible and auditableModels need a promotion workflow before they touch production traffic Databricks solves all of this in one platform: Apache Spark for large-scale data preparationMLflow (built-in) for experiment tracking, model registry, and lineageDatabricks Model Serving for one-click deployment with auto-scalingUnity Catalog for governed model and data access The ML Lifecycle Architecture Training Pipeline: End-to-End Flow The flow below shows how a single training run moves through the system — from a triggered job to a promoted model alias. Environment Setup Python # Databricks Runtime ML 14.x+ recommended (ships CUDA, PyTorch, Transformers) # Install additional packages in your cluster init script or notebook %pip install \ transformers==4.40.0 \ peft==0.10.0 \ trl==0.8.6 \ accelerate==0.29.3 \ horovod[spark]==0.28.1 \ datasets==2.19.0 \ evaluate==0.4.1 \ --quiet dbutils.library.restartPython() import os import mlflow import mlflow.transformers import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling, ) from peft import LoraConfig, get_peft_model, TaskType from pyspark.sql import functions as F from datasets import Dataset # ── MLflow setup ────────────────────────────────────────────────────────────── # On Databricks, MLflow tracking URI is pre-configured to the workspace # mlflow.set_tracking_uri("databricks") # uncomment for external clusters EXPERIMENT_NAME = "/Users/[email protected]/llm-finetuning/support-classifier" mlflow.set_experiment(EXPERIMENT_NAME) BASE_MODEL = "mistralai/Mistral-7B-Instruct-v0.2" CATALOG = "prod" GOLD_DB = f"{CATALOG}.gold" MODEL_NAME = f"{CATALOG}.ml.support_intent_classifier" # Unity Catalog model path print(f"GPU available: {torch.cuda.is_available()}") print(f"Device count: {torch.cuda.device_count()}") Preparing Training Data With Spark Spark handles the heavy lifting before training: filtering noisy records, formatting prompt-response pairs, and splitting the dataset. This stage runs on the CPU cluster — GPU nodes only spin up for the actual training job. Plain Text # ── Spark Data Preparation ──────────────────────────────────────────────────── def build_prompt(row): """ Format a support conversation into an instruction-following prompt. Uses the Mistral instruct template: [INST] ... [/INST] """ return f"[INST] Classify the intent of this support message:\n\n{row['message']} [/INST] {row['intent_label']}" # Load from Delta Gold table raw_df = ( spark.table(f"{GOLD_DB}.support_conversations") .filter(F.col("quality_score") >= 0.85) # keep high-quality labels only .filter(F.col("intent_label").isNotNull()) .filter(F.length("message") > 20) # filter empty/stub messages .filter(F.length("message") < 2048) # filter messages too long to tokenize .dropDuplicates(["message_hash"]) # remove exact duplicates .select("message", "intent_label", "created_date") .limit(500_000) # cap for this training run ) print(f"Training candidates: {raw_df.count():,}") # Build prompt strings using Spark — parallelized across all workers prompt_udf = F.udf( lambda msg, label: f"[INST] Classify the intent of this support message:\n\n{msg} [/INST] {label}", returnType="string" ) prepared_df = ( raw_df .withColumn("prompt", prompt_udf(F.col("message"), F.col("intent_label"))) .withColumn("token_count", F.size(F.split(F.col("prompt"), r"\s+"))) # rough word count proxy .filter(F.col("token_count") < 512) # stay within model context .select("prompt", "token_count", "created_date") ) # Stratified split using Spark (reproducible with seed) train_df, val_df, test_df = prepared_df.randomSplit([0.80, 0.10, 0.10], seed=42) # Persist splits to Delta for lineage + reproducibility train_df.write.format("delta").mode("overwrite").saveAsTable(f"{GOLD_DB}.llm_train_split") val_df.write.format("delta").mode("overwrite").saveAsTable(f"{GOLD_DB}.llm_val_split") test_df.write.format("delta").mode("overwrite").saveAsTable(f"{GOLD_DB}.llm_test_split") print(f"Train: {train_df.count():,} | Val: {val_df.count():,} | Test: {test_df.count():,}") Fine-Tuning With Hugging Face + MLflow Tracking We use LoRA (Low-Rank Adaptation) — a parameter-efficient fine-tuning technique that freezes the base model and only trains a small set of adapter matrices. This cuts GPU memory requirements by ~70% compared to full fine-tuning, making 7B parameter models trainable on a single A100. Python # ── LoRA Fine-Tuning with MLflow Autolog ───────────────────────────────────── # Convert Spark DataFrame to Hugging Face Dataset train_pd = spark.table(f"{GOLD_DB}.llm_train_split").select("prompt").toPandas() val_pd = spark.table(f"{GOLD_DB}.llm_val_split").select("prompt").toPandas() hf_train = Dataset.from_pandas(train_pd) hf_val = Dataset.from_pandas(val_pd) # Load tokenizer and base model tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, padding_side="right") tokenizer.pad_token = tokenizer.eos_token def tokenize(batch): return tokenizer( batch["prompt"], truncation=True, max_length=512, padding="max_length", ) hf_train_tok = hf_train.map(tokenize, batched=True, remove_columns=["prompt"]) hf_val_tok = hf_val.map(tokenize, batched=True, remove_columns=["prompt"]) # Load base model in 4-bit quantization (QLoRA) from transformers import BitsAndBytesConfig bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, ) base_model = AutoModelForCausalLM.from_pretrained( BASE_MODEL, quantization_config=bnb_config, device_map="auto", trust_remote_code=True, ) # Apply LoRA adapter config lora_config = LoraConfig( task_type=TaskType.CAUSAL_LM, r=16, # rank — higher = more capacity, more memory lora_alpha=32, # scaling factor lora_dropout=0.05, target_modules=["q_proj", "v_proj"], # attention layers to adapt bias="none", ) model = get_peft_model(base_model, lora_config) model.print_trainable_parameters() # Typical output: trainable params: 13,631,488 || all params: 3,765,522,432 || trainable: 0.36% # Training arguments training_args = TrainingArguments( output_dir="/dbfs/tmp/llm-finetune/checkpoints", num_train_epochs=3, per_device_train_batch_size=4, per_device_eval_batch_size=4, gradient_accumulation_steps=8, # effective batch size = 32 warmup_ratio=0.03, learning_rate=2e-4, fp16=False, bf16=True, # use bfloat16 on A100/H100 logging_steps=50, eval_strategy="steps", eval_steps=200, save_strategy="steps", save_steps=200, load_best_model_at_end=True, metric_for_best_model="eval_loss", report_to="mlflow", # pipe all metrics to MLflow automatically ) data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) trainer = Trainer( model=model, args=training_args, train_dataset=hf_train_tok, eval_dataset=hf_val_tok, tokenizer=tokenizer, data_collator=data_collator, ) # ── MLflow Run ──────────────────────────────────────────────────────────────── with mlflow.start_run(run_name="mistral-7b-lora-v1") as run: # Log hyperparameters manually for full auditability mlflow.log_params({ "base_model": BASE_MODEL, "lora_rank": lora_config.r, "lora_alpha": lora_config.lora_alpha, "lora_dropout": lora_config.lora_dropout, "target_modules": str(lora_config.target_modules), "quantization": "4-bit QLoRA (nf4)", "train_samples": len(hf_train_tok), "val_samples": len(hf_val_tok), "epochs": training_args.num_train_epochs, "effective_batch": training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps, "learning_rate": training_args.learning_rate, }) # Train — metrics auto-logged to MLflow via report_to="mlflow" trainer.train() # Log final eval metrics explicitly eval_results = trainer.evaluate() mlflow.log_metrics({ "final_eval_loss": eval_results["eval_loss"], "final_eval_perplexity": torch.exp(torch.tensor(eval_results["eval_loss"])).item(), }) # Log the model + tokenizer as a single MLflow artifact mlflow.transformers.log_model( transformers_model={"model": trainer.model, "tokenizer": tokenizer}, artifact_path="model", task="text-generation", registered_model_name=MODEL_NAME, # auto-registers to Unity Catalog metadata={"base_model": BASE_MODEL, "finetuning": "QLoRA"}, ) run_id = run.info.run_id print(f"Run ID: {run_id}") print(f"Eval Loss: {eval_results['eval_loss']:.4f}") Distributed Training With Horovod on Spark For datasets beyond a few million tokens, or when you need to fine-tune models larger than 13B parameters, single-node training hits GPU memory walls. Horovod distributes training across multiple GPU workers using ring-allreduce — each worker holds a full model replica, and gradients are averaged across workers after every backward pass. Python # ── Distributed Fine-Tuning with Horovod on Spark ──────────────────────────── # Best for: datasets > 5M tokens, models > 13B params, or when you need # to reduce wall-clock training time below a business SLA. import horovod.torch as hvd from sparkdl import HorovodRunner def train_fn(hparams): """ Training function executed on each Horovod worker. Each worker trains on a data shard; gradients are averaged across workers. """ import horovod.torch as hvd from transformers import AutoModelForCausalLM, Trainer, TrainingArguments from datasets import load_from_disk hvd.init() # Each worker loads only its shard local_rank = hvd.local_rank() world_size = hvd.size() torch.cuda.set_device(local_rank) # Load dataset shard for this worker dataset = load_from_disk(f"/dbfs/tmp/llm-finetune/train_shards/shard_{local_rank}") model = AutoModelForCausalLM.from_pretrained( BASE_MODEL, torch_dtype=torch.bfloat16, ).to(f"cuda:{local_rank}") # Wrap optimizer with Horovod DistributedOptimizer optimizer = torch.optim.AdamW(model.parameters(), lr=hparams["lr"]) optimizer = hvd.DistributedOptimizer( optimizer, named_parameters=model.named_parameters(), compression=hvd.Compression.fp16, # compress gradient communication ) # Broadcast initial model weights from rank 0 to all workers hvd.broadcast_parameters(model.state_dict(), root_rank=0) hvd.broadcast_optimizer_state(optimizer, root_rank=0) training_args = TrainingArguments( output_dir=f"/dbfs/tmp/llm-finetune/hvd_output", num_train_epochs=hparams["epochs"], per_device_train_batch_size=hparams["batch_size"], bf16=True, no_cuda=False, dataloader_num_workers=2, # Only rank 0 logs and saves — avoids duplicated artifacts report_to="mlflow" if hvd.rank() == 0 else "none", save_strategy="epoch" if hvd.rank() == 0 else "no", ) trainer = Trainer( model=model, args=training_args, train_dataset=dataset, optimizers=(optimizer, None), ) trainer.train() # Only rank 0 registers the model if hvd.rank() == 0: mlflow.transformers.log_model( transformers_model={"model": model, "tokenizer": tokenizer}, artifact_path="model", registered_model_name=MODEL_NAME, ) # Launch distributed training across N GPU workers # np = number of processes = number of GPUs across all nodes hr = HorovodRunner(np=8, driver_log_verbosity="all") # 8 GPUs (e.g., 2 × 4-GPU nodes) hr.run(train_fn, hparams={ "lr": 2e-5, "epochs": 3, "batch_size": 2, # per GPU; effective = 2 × 8 = 16 }) MLflow Model Registry and Promotion Once a run completes, models land in the MLflow Model Registry. Databricks uses Unity Catalog-backed model aliases (candidate, staging, champion) instead of the legacy stage model. Python # ── Model Registry Promotion Workflow ───────────────────────────────────────── from mlflow.tracking import MlflowClient client = MlflowClient() # Get the latest registered version from the training run latest_version = client.get_registered_model(MODEL_NAME).latest_versions[0].version # Tag the new version as a candidate for review client.set_registered_model_alias( name=MODEL_NAME, alias="candidate", version=latest_version, ) client.set_model_version_tag( name=MODEL_NAME, version=latest_version, key="fine_tuned_on", value="gold.support_conversations", ) client.set_model_version_tag( name=MODEL_NAME, version=latest_version, key="eval_loss", value=str(round(eval_results["eval_loss"], 4)), ) # After human review / automated eval gates pass → promote to staging client.set_registered_model_alias( name=MODEL_NAME, alias="staging", version=latest_version, ) # After integration tests pass → promote to champion (production) client.set_registered_model_alias( name=MODEL_NAME, alias="champion", version=latest_version, ) # Load model by alias — decouples code from version numbers champion_model = mlflow.transformers.load_model(f"models:/{MODEL_NAME}@champion") Serving With Databricks Model Serving Python # ── Deploy to Databricks Model Serving ──────────────────────────────────────── # Can also be done via the UI: Models > Serving > Create Endpoint import requests, json WORKSPACE_URL = "https://<your-workspace>.azuredatabricks.net" TOKEN = dbutils.secrets.get("prod-scope", "databricks-token") endpoint_config = { "name": "support-intent-classifier", "config": { "served_models": [ { "name": "mistral-7b-lora-champion", "model_name": MODEL_NAME, "model_version": latest_version, "workload_size": "Small", # 1 GPU "scale_to_zero_enabled": True, "workload_type": "GPU_LARGE", # A10G } ], "traffic_config": { "routes": [ {"served_model_name": "mistral-7b-lora-champion", "traffic_percentage": 100} ] }, "auto_capture_config": { "catalog_name": CATALOG, "schema_name": "ml", "table_name": "support_classifier_inference_log", "enabled": True, # log all requests/responses to Delta } } } response = requests.post( f"{WORKSPACE_URL}/api/2.0/serving-endpoints", headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}, data=json.dumps(endpoint_config), ) print(response.json()) # ── Query the endpoint ──────────────────────────────────────────────────────── def classify_intent(message: str) -> str: payload = { "inputs": {"prompt": f"[INST] Classify the intent of this support message:\n\n{message} [/INST]"}, "params": {"max_new_tokens": 50, "temperature": 0.1}, } resp = requests.post( f"{WORKSPACE_URL}/serving-endpoints/support-intent-classifier/invocations", headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}, data=json.dumps(payload), ) return resp.json()["predictions"][0] print(classify_intent("My order hasn't arrived and it's been 10 days")) # → "shipping_delay" Comparing Fine-Tuning Strategies StrategyGPU MemoryTraining TimeQuality vs Full FTWhen to UseFull Fine-TuningVery High (80GB+)SlowestBaseline (100%)Max quality, large budgetLoRAMedium (24–40GB)Fast~95%Best general-purpose choiceQLoRA (4-bit + LoRA)Low (10–16GB)Medium~90–93%Single GPU, cost-sensitivePrefix TuningLowVery Fast~80–85%Minimal compute, quick iterationPrompt TuningVery LowFastest~70–80%Inference-only, no weight changeRLHF / DPOHighSlowestBest alignmentInstruction-following qualityDistillationMedium (teacher)MediumVariesSmaller, faster inference model Rule of thumb: Start with QLoRA on a single GPU. If eval loss stagnates or quality gates fail, move to LoRA on multi-GPU. Full fine-tuning is only warranted when you have >1M high-quality labeled examples and a measurable business case for the incremental quality gain. Key Takeaways Spark handles data at scale before training even begins — filtering, tokenization, and splitting across millions of records in minutes.QLoRA + LoRA makes fine-tuning 7B–13B models accessible on a single A100, reducing memory footprint by ~70% with minimal quality loss.MLflow report_to="mlflow" gives you automatic experiment tracking with zero extra code — every loss curve, gradient norm, and learning rate schedule is captured.Unity Catalog model aliases (candidate → staging → champion) replace brittle version-number references in deployment code, making promotions and rollbacks a one-liner.Auto Capture on Databricks Model Serving logs every inference request and response to a Delta table — giving you a feedback loop to build your next training dataset.Horovod on Spark is the right tool when single-node training exceeds your SLA — it leverages your existing Spark cluster without a separate orchestration layer. References Databricks — LLM Fine-Tuning on DatabricksMLflow — Transformers Flavor DocumentationHugging Face PEFT — LoRA & QLoRAQLoRA Paper — "QLoRA: Efficient Finetuning of Quantized LLMs" (Dettmers et al., 2023)LoRA Paper — "LoRA: Low-Rank Adaptation of Large Language Models" (Hu et al., 2021)Databricks — Model Serving (Foundation Model APIs)Horovod on Spark — Official DocumentationDatabricks — HorovodRunner APIDatabricks — Inference Tables (Auto Capture)"Training language models to follow instructions with human feedback" — InstructGPT / RLHF (OpenAI, 2022) More
Data Pipeline Observability: Why Your AI Model Fails in Production

Data Pipeline Observability: Why Your AI Model Fails in Production

By Abhilash Rao Mesala
The 3:00 AM Incident That Changed Everything It was a Tuesday morning when the alerts started firing. Our recommendation engine, the one that drives 30% of our revenue, had tanked. Accuracy dropped from 94% to 58% overnight. The data science team immediately blamed the model. They started tweaking hyperparameters, re-training on new data, and running diagnostics. Nothing worked. I got pulled into the war room at 3:00 AM. The first thing I asked wasn't "What's wrong with the model?" It was "What changed in the data pipeline?" Turns out, everything. A vendor had pushed a schema change upstream. A field that used to be required became optional. Null values started flowing through our pipeline. Our feature engineering code didn't handle nulls gracefully; it just propagated them downstream. By the time the data reached the model, 40% of our feature vectors were corrupted. The model wasn't broken. The data was. We spent six hours manually rolling back the schema change, re-running the pipeline, and restoring service. The incident report was brutal: "Lack of data validation caught a breaking change too late." That's when I realized we needed observability in our data pipeline, not just in our models. The Problem: Data Quality is Invisible Until It Breaks Here's the uncomfortable truth about data pipelines: they fail silently. Your ETL job completes successfully. Your Spark cluster finishes transformations. Your data warehouse loads without errors. Everything looks green in the monitoring dashboard. But the data itself? Garbage in, garbage out. There are three categories of failures that break AI models in production: Missing Values: A source system stops populating a field. Your pipeline doesn't validate it. The model gets NaN values it never saw during training. Predictions become random noise. Schema Changes: An upstream team adds a new column, renames an existing one, or changes data types. Your pipeline doesn't expect these changes. Either it crashes, or worse, it silently maps data to the wrong columns. Distribution Shifts: The statistical properties of your data change. A field that was always between 0 and 100 suddenly has values of 50,000. Your model's scaling assumptions break. Predictions become nonsensical. None of these show up in traditional infrastructure monitoring. Your CPU is fine. Memory is fine. Network is fine. But your data is on fire. The Solution: Observability at Every Layer I started building a three-layer observability framework using dbt, Great Expectations, and custom validation logic. The goal was simple: catch data quality issues before they reach the model. Layer 1: dbt Tests (The First Line of Defense) dbt tests are your cheapest, fastest way to catch obvious data quality issues. They run after every transformation and fail the entire pipeline if something's wrong. Here's what we implemented: SQL -- models/staging/stg_user_events.yml version: 2 models: - name: stg_user_events columns: - name: user_id tests: - not_null - unique - name: event_timestamp tests: - not_null - dbt_utils.expression_is_true: expression: "event_timestamp <= current_timestamp()" - name: event_value tests: - not_null - dbt_utils.expression_is_true: expression: "event_value > 0" These tests are simple but powerful. They catch: Missing required fields (not_null)Duplicate records (unique)Impossible values (event_timestamp in the future)Out-of-range values (negative prices) We run these tests on every dbt run. If any test fails, the pipeline stops. No data reaches the model. No silent corruption. The beauty of dbt tests is that they're version-controlled, documented, and part of your transformation code. When a schema change happens, you update the test, commit it, and everyone knows what changed. Layer 2: Great Expectations (The Statistical Validator) dbt tests catch structural issues. Great Expectations catches statistical anomalies, the subtle shifts that break models. Here's a real scenario: our user_age column had a distribution of 18-65 for two years. Then one day, we started getting ages of 200, 500, 1000. A data entry bug upstream. dbt tests wouldn't catch this because the values are technically valid integers. But Great Expectations would. Python # great_expectations/expectations/user_events_expectations.py from great_expectations.core.batch import RuntimeBatchRequest from great_expectations.data_context import DataContext context = DataContext() suite = context.create_expectation_suite( expectation_suite_name="user_events_suite", overwrite_existing=True ) validator = context.get_validator( batch_request=RuntimeBatchRequest( datasource_name="my_spark_datasource", data_connector_name="default_runtime_data_connector", data_asset_name="user_events" ), expectation_suite_name="user_events_suite" ) # Expect user_age to be between 18 and 120 validator.expect_column_values_to_be_between( column="user_age", min_value=18, max_value=120 ) # Expect event_value to have a mean between 50 and 200 validator.expect_column_mean_to_be_between( column="event_value", min_value=50, max_value=200 ) # Expect less than 5% missing values in critical columns validator.expect_column_values_to_not_be_null( column="user_id", mostly=0.95 ) # Expect the distribution to match historical patterns validator.expect_column_kl_divergence_from_list( column="event_type", partition_object={"event_type": ["click", "view", "purchase"]}, threshold=0.1 ) validator.save_expectation_suite(discard_failed_expectations=False) Great Expectations runs after dbt tests. It validates: Value ranges (age between 18 and 120)Statistical properties (mean event value between 50 and 200)Null rates (less than 5% missing in critical columns)Distribution shifts (event_type distribution matches historical patterns) If Great Expectations detects an anomaly, it alerts us. We investigate before the data reaches the model. Layer 3: Custom Validation (The Domain Expert) dbt and Great Expectations are generic. Your domain is specific. We added custom validation logic that understands our business. Python # pipelines/validation/custom_validators.py import pandas as pd from datetime import datetime, timedelta def validate_feature_engineering(df: pd.DataFrame) -> dict: """ Custom validation for features before they reach the model. Returns a dict of validation results. """ results = {} # Validate 1: Feature completeness # We need at least 95% of features populated feature_cols = [col for col in df.columns if col.startswith('feature_')] null_rate = df[feature_cols].isnull().sum().sum() / (len(df) * len(feature_cols)) results['feature_completeness'] = { 'passed': null_rate < 0.05, 'null_rate': null_rate, 'threshold': 0.05 } # Validate 2: Feature scaling # After normalization, features should be roughly between -3 and 3 (3 sigma) for col in feature_cols: max_val = df[col].max() min_val = df[col].min() results[f'{col}_scaling'] = { 'passed': max_val < 10 and min_val > -10, 'max': max_val, 'min': min_val } # Validate 3: Temporal consistency # Events should be recent (within last 30 days) if 'event_date' in df.columns: df['event_date'] = pd.to_datetime(df['event_date']) days_old = (datetime.now() - df['event_date'].max()).days results['temporal_freshness'] = { 'passed': days_old < 30, 'days_old': days_old, 'threshold_days': 30 } # Validate 4: Business logic # Revenue should always be positive if 'revenue' in df.columns: negative_revenue = (df['revenue'] < 0).sum() results['business_logic_revenue'] = { 'passed': negative_revenue == 0, 'negative_count': negative_revenue } return results def validate_and_alert(df: pd.DataFrame, validation_results: dict) -> bool: """ Check all validations and alert if any fail. Returns True if all pass, False otherwise. """ all_passed = True for check_name, check_result in validation_results.items(): if not check_result['passed']: all_passed = False print(f"ALERT: {check_name} failed") print(f"Details: {check_result}") # Send to monitoring system (Datadog, New Relic, etc.) # send_alert(check_name, check_result) return all_passed This custom validation runs after Great Expectations. It checks: Feature completeness (95% of features populated)Feature scaling (normalized features in the expected range)Temporal freshness (data is recent)Business logic (revenue is positive) If any check fails, we block the pipeline and alert the team. The Real-World Gotchas We Discovered Gotcha 1: Validation Overhead Running dbt tests, Great Expectations, and custom validation on every pipeline run adds latency. We went from 15-minute runs to 25-minute runs. The trade-off was worth it (catching one data quality issue saved us more time than we lost), but you need to plan for it. Gotcha 2: False Positives Great Expectations' distribution shift detection is sensitive. Legitimate business changes (a marketing campaign causing a spike in user_age distribution) triggered false alerts. We had to tune thresholds carefully and add context to alerts. Gotcha 3: Schema Changes Are Sneaky A vendor added a new column to an upstream table. Our pipeline didn't break; it just ignored the new column. But the data science team expected it. We added schema validation to catch new columns and alert us. Gotcha 4: Null Handling Varies Python treats null as None. SQL treats it as NULL. Spark treats it as null. When data flows between systems, nulls get lost or misinterpreted. We had to standardize null handling across the entire pipeline. The Framework: A Decision Matrix Here's how we decide which validation layer to use: Issue TypeCaught ByExampleActionMissing required fielddbt testsuser_id is nullFail pipeline immediatelyDuplicate recordsdbt testsSame user_id appears twiceFail pipeline immediatelyImpossible valuesdbt testsevent_timestamp in futureFail pipeline immediatelyOut-of-range valuesGreat Expectationsage > 150Alert, investigate, fail if severeDistribution shiftGreat Expectationsevent_value mean changes 50%Alert, investigate, continue if acceptableBusiness logic violationCustom validationrevenue is negativeAlert, investigate, failSchema changeCustom validationNew column added upstreamAlert, investigate, update tests The Results: From Chaos to Confidence After implementing this three-layer framework: Incident reduction: We went from 2-3 data quality incidents per month to 0 in six months.Time to resolution: When issues do occur, we catch them within minutes instead of hours.Model stability: Model accuracy stopped fluctuating. It's now consistently 93-95%.Team confidence: Data scientists trust the data. Engineers trust the pipeline. The best part? We caught the schema change incident before it happened. Great Expectations detected the distribution shift, we investigated, found the upstream change, and coordinated with the vendor team before any data reached production. Getting Started: The Minimal Viable Observability You don't need to implement everything at once. Start here: Week 1: Add dbt tests for not_null and unique on critical columns.Week 1: Add dbt tests for not_null and unique on critical columns.Week 1: Add dbt tests for not_null and unique on critical columns.Week 4: Set up alerting so you're notified when validations fail. That's it. You now have observability in your data pipeline. Conclusion: Observability Saves Models Your AI model isn't failing because it's bad. It's failing because the data feeding it is bad. And you won't know the data is bad until you look. The best models in the world can't save you from garbage data. But good observability can. dbt tests, Great Expectations, and custom validation aren't fun. They don't make it into conference talks. But they'll save your production system at 3:00 AM. Start small. Test early. Validate often. More
Implementing Asynchronous Communication Between Microservices Using Kafka and Spring Boot
Implementing Asynchronous Communication Between Microservices Using Kafka and Spring Boot
By Mallikharjuna Manepalli
Data Governance Checklist for AI-Driven Systems
Data Governance Checklist for AI-Driven Systems
By Abhishek Gupta DZone Core CORE
The Real-Time Revolution: Why Blockchain Needs Data Stream Processing
The Real-Time Revolution: Why Blockchain Needs Data Stream Processing
By Gautam Goswami DZone Core CORE
Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot
Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot

Managing high-volume message traffic in distributed architectures is crucial. Efficient use of database and CPU resources is also very important. There are structures that allow us to receive messages in batches. The default Spring Kafka "BatchMessageListener" structure addresses this need. However, the processing of these messages often goes through a sequential bottleneck. This article will discuss the structure and usage of Kotlin Coroutines in detail. We will examine how to maximize Kafka message processing performance using Structured Concurrency principles and Resource Throttling techniques. Architectural Bottleneck: Sequential I/O Blocking On the current Kafka listener: Database or external service calls made for each message directly increase total processing times. If the processing speed of a message lags behind the message arrival speed and the max-poll-interval-ms time is exceeded, the consumer is removed from the consumer group. Rebalancing is triggered, and the partitions of that consumer are redistributed to other consumers in the group. Kotlin @KafkaListener(topics = ["usage-pool-topic"]) fun usagePoolListener(records: List<ConsumerRecord<String, String>>) { records.forEach { record -> processRecord(record) // Network latency + DB I/O blocking } } Solution 1. Batch-Fetch and In-Memory Map Structure Before any concurrent code is entered, data is retrieved collectively from all necessary entities. Multiple separate queries are converted into a batch query before data processing begins. The N+1 query problem is solved at the application layer. All data is cached once before being broken down into concurrent operations. Having the data cached significantly reduces our reliance on the database. Using the associateBy function, we transform the data into a map structure with X access times. This allows us to read the data safely from the maps instead of reading each concurrent operation from the database. Kotlin val messages = records.map { objectMapper.readValue(it.value(), UsagePoolRecord::class.java) } val usagePoolEntities = usagePoolRepository .findByIds(messages.map { it.usagePoolId.toBigInteger() }) .associateBy { it.usagePoolId } val lockEntities = lockRepository .findByUserIds(messages.map { it.userId }) .associateBy { it.userId } 2. Structured Concurrency Memory Management With Chunking The chunk structure serves two purposes. It prevents the creation of coroutines simultaneously. This prevents unnecessary memory usage. Each chunk writes to the database after all coroutines have completed their operations. Unnecessary connection pool consumption is avoided. Kotlin messages.chunked(150).forEach { chunk -> // Each chunk of 150 records is processed concurrently } Resource Isolation With limitedParallelism Why limitedParallelism? If the database connection pool has, for example, X connections, keeping the parallelism limit below X prevents "Connection Timeout" errors. Kotlin messages.chunked(150).forEach { chunk -> val deferredResults = chunk.map { record -> CoroutineScope(Dispatchers.IO.limitedParallelism(15)).async { try { processRecord(record, usagePoolEntities, lockEntities) } catch (e: Exception) { log.error("Operation error: ${record.key()}", e) buildErrorRecord(record, e) } } } val results = deferredResults.awaitAll() // Structural waiting collectAndAggregate(results) } The Dispatchers.IO.limitedParallelism(X) command limits the number of concurrent coroutines to X, preventing the DB connection pool from being exhausted.Each coroutine returns a result with the async command. The awaitAll() command waits for all coroutines in the chunk to finish before proceeding to the next step. runBlocking This function blocks callers until all concurrent operations are complete. This is the correct approach here because: It ensures that the Kafka consumer remains blocked to maintain its offset commit structure until all records in the batch are processed. We still benefit from concurrent operation parallelism within the runBlocking block. 3. Thread-Safe Result Structure After the awaitAll() operation, all results are collected in thread-safe queues. Then a single batch write operation takes place. Using MutableList structures to combine results returned from parallel processed coroutines can lead to data loss. At this point, lock-free data structures should be preferred. ConcurrentLinkedQueue uses CAS (Compare-And-Swap) algorithms instead of synchronized blocks. This provides superior performance in high-content write operations. Why Shouldn't We Use ConcurrentLinkedQueue? Concurrent operations (concurrent functions) perform simultaneous write operations to a shared collection of results. Using MutableList leads to race conditions. It performs well in secure and concurrent write operations. Kotlin data class AggregatedRecords( val processedSave: ConcurrentLinkedQueue<ProcessedEntity> = ConcurrentLinkedQueue(), val toDelete: ConcurrentLinkedQueue<UsagePoolEntity> = ConcurrentLinkedQueue(), val retryQueue: ConcurrentLinkedQueue<RetryEntity> = ConcurrentLinkedQueue() ) The DataIntegrityViolationException return is important. When two consumer instances are processing the same record, one of them falls into a unique constraint violation. Instead of making the entire batch fail, record-by-record deletion is performed. Kotlin AggregatedRecords.processedSave .chunked(150) .forEach { batch -> try { processedRepository.saveAll(batch) } catch (e: DataIntegrityViolationException) { batch.forEach { record -> try { processedRepository.save(record) } catch (e: DataIntegrityViolationException) {} } } } 4. Error Tolerance in Write Operations Batch write (saveAll) operations are performant. However, a "Unique Constraint" error in a single record can cause the entire batch to fail. The following structure is critical to meet Optimistic Locking or Idempotency requirements. Kotlin aggregatedRecords.processedSave.chunked(150).forEach { batch -> try { processedRepository.saveAll(batch) } catch (e: DataIntegrityViolationException) { // Fallback: Try one by one if batch fails batch.forEach { record -> try { processedRepository.save(record) } catch (innerException: DataIntegrityViolationException) { log.warn("Duplicate record skipped: ${record.id}") } } } } 5. Data Flow Diagram Ingress: The Kafka batch is caught with runBlocking.Preparation: All necessary context data is retrieved bulk from the DB.Execution: Coroutines are started asynchronously in chunks.Synchronization: The completion of all coroutines is awaited as a barrier point with awaitAll().Egress: Collected results are made permanent with saveAll. Performance Analysis and Results Conclusion Processing Kafka messages in Spring Boot with Kotlin Coroutines not only increases speed but also improves code readability and makes resource management deterministic (predictable). The use of runBlocking allows us to build a bridge between the blocking Kafka consumer thread and the suspended world without disrupting Kafka's offset management mechanism. Dependencies XML <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-coroutines-core</artifactId> <version>1.7.3</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

By Erkin Karanlık
From ETL to Lakeflow: Shifting to a Declarative Data Paradigm
From ETL to Lakeflow: Shifting to a Declarative Data Paradigm

If you've worked on a data platform for more than a few years, you've almost certainly built the same pipeline twice. First, the way the team wrote pipelines in 2019: a notebook here, a Python script there, an Airflow DAG to glue it all together, and a long document explaining the order things had to run in. Then the rewrite, two years later, when somebody quit, and nobody could remember why a particular task had a sleep(180) in it. Lakeflow is Databricks' answer to that pattern, and the shift it's pushing for is bigger than the marketing makes it sound. It isn't a new orchestrator. It's a move from imperative pipelines, where you write the steps, to declarative pipelines, where you write the destination and let the engine figure out the steps. What follows is the practical version of that shift — what's actually different, where the gains are real, and how to migrate without ending up with a half-converted lakehouse. 1. The Imperative ETL Trap: Why Traditional Pipelines Are Hitting a Wall Imperative ETL is a fancy name for the way most pipelines are still written: a sequence of steps, hand-ordered, run on a schedule. It works fine until it doesn't, and the failure modes are remarkably consistent across teams I've worked with: The DAG outgrows its author. The person who wrote the original 30-task Airflow DAG moves teams. The next engineer is afraid to delete anything because they can't tell which tasks are still needed.Backfills are surgical operations. Re-running yesterday means manually figuring out which downstream tables are stale, in what order. Half the team's tribal knowledge lives in Slack threads about backfills.Quality checks are bolted on. Data quality lives in a separate framework, often a separate codebase, often run by a separate team. By the time a check fails, the bad data is already in the warehouse.Lineage is a slide in a deck. Whatever lineage exists was drawn by hand for a quarterly review and was out of date the day after. None of these are bugs in the imperative model. They're features of it. When you write the steps, you own the steps — including all the cross-task assumptions the engine doesn't know about. 2. What "Declarative" Actually Means in Lakeflow Declarative is one of those words that gets used loosely. In Lakeflow Pipelines, it has a specific, narrow meaning: you describe each table's logical definition (its source query, its expected schema, its quality rules), and the engine determines execution. It picks the order. It decides which tables are streaming and which are batch. It scales the cluster. It figures out incremental processing. It produces lineage automatically because lineage is now a derived property of the dependency graph it built for you. What it isn't: It isn't "low-code." You're still writing SQL or PySpark. The thing that's gone is the orchestration boilerplate around it.It isn't a magic upgrade for any pipeline. Pipelines that genuinely need procedural logic — multi-step API calls with branching, complex pre/post-processing — still belong in Lakeflow Jobs (the orchestrator) or even external code, called from the pipeline.It isn't free. There's a learning curve in stopping yourself from writing the steps you used to write. The first month, most teams over-specify. The mental shift: stop describing how the data should flow. Describe what each table is. Lakeflow figures out the flow. 3. The Lakeflow Architecture: Connect, Pipelines, Jobs Lakeflow is three components that share one governance layer (Unity Catalog). They map roughly onto the three traditional layers of a pipeline — ingestion, transformation, orchestration — but with the imperative wiring removed. Figure 1. Lakeflow's three components on top of Unity Catalog. Pipelines is the declarative core; Connect feeds it, Jobs schedules it. A few practical points about this picture. Lakeflow Connect is where managed connectors live (Salesforce, Workday, Postgres CDC, and a steadily growing list); it's the part you reach for instead of writing yet another ingestion script. Lakeflow Pipelines is where the declarative paradigm actually lives — every other component is conventional. And Lakeflow Jobs is the part that looks most like Airflow: task graphs, retries, alerts. The trick is that the things inside a Pipelines task aren't tasks themselves — they're table definitions, and the engine builds the internal DAG from their dependencies. 4. Translating an Imperative Pipeline to a Declarative One The clearest way to feel the difference is to look at the same logic written both ways. Imagine a small bronze→silver→gold pipeline for transactions: ingest raw files, deduplicate, then aggregate to daily totals. 4a. The imperative version (notebook + Airflow style) Python # bronze.py df = spark.read.json("s3://landing/txns/") df.write.format("delta").mode("append").saveAsTable("bronze.txns") # silver.py -- runs after bronze finishes raw = spark.table("bronze.txns") clean = (raw.dropDuplicates(["txn_id"]) .filter("amount IS NOT NULL")) clean.write.format("delta").mode("overwrite").saveAsTable("silver.txns") # gold.py -- runs after silver finishes agg = (spark.table("silver.txns") .groupBy("ingest_date", "account_id") .sum("amount") .withColumnRenamed("sum(amount)", "daily_total")) agg.write.format("delta").mode("overwrite").saveAsTable("gold.daily_totals") # airflow_dag.py -- the part that actually controls execution bronze_task >> silver_task >> gold_task 4b. The same logic, declared in a Lakeflow Pipeline Python import dlt from pyspark.sql.functions import sum as _sum @dlt.table( name="bronze_txns", comment="Raw transactions landed from S3.", ) def bronze_txns(): return (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .load("s3://landing/txns/")) @dlt.table(name="silver_txns", comment="Deduplicated, validated transactions.") @dlt.expect_or_drop("non_null_amount", "amount IS NOT NULL") @dlt.expect("unique_txn", "txn_id IS NOT NULL") def silver_txns(): return (dlt.read_stream("bronze_txns") .dropDuplicates(["txn_id"])) @dlt.table(name="gold_daily_totals") def gold_daily_totals(): return (dlt.read("silver_txns") .groupBy("ingest_date", "account_id") .agg(_sum("amount").alias("daily_total"))) Two things vanished in the rewrite. There is no DAG file, because the dependencies are inferred from dlt.read / dlt.read_stream calls. There is no separate data quality framework — quality lives next to the table definition, where it belongs. The engine decides what's streaming and what's batch from the calls themselves; bronze is a stream, silver is a stream of the bronze stream, gold is a batch over silver. None of that ordering is in the code I wrote. 5. Quality, Lineage, and Operational Visibility for Free The expectations decorators above (@dlt.expect, @dlt.expect_or_drop, and the stricter @dlt.expect_or_fail) are not just convenience syntax; they become first-class objects in the pipeline. Every run produces a per-expectation pass/fail count, queryable directly: SQL -- How many silver rows failed each expectation, per run, last 7 days SELECT pipeline_run_id, flow_name, expectation_name, passed_records, failed_records, dropped_records FROM event_log("<pipeline-id>") WHERE event_type = 'flow_progress' AND timestamp >= current_timestamp() - INTERVAL 7 DAYS ORDER BY timestamp DESC; Lineage shows up automatically in Unity Catalog — both the table-level edges (gold_daily_totals depends on silver_txns) and column-level edges (gold's daily_total derives from silver's amount). Operationally, this is the change that has the largest day-to-day impact: when somebody asks "what does this column mean and where did it come from," you stop having to guess. What this replaces: Great Expectations runs scheduled separately, OpenLineage stitched together by hand, and a homegrown observability dashboard reading task logs. All three of those projects either go away or shrink dramatically. 6. Migration Strategy: How Teams Actually Move Off Imperative Pipelines I've not seen a successful big-bang migration. The pattern that works is layered: Phase 1 — New pipelines only Make Lakeflow Pipelines the default for any new pipeline. This sounds obvious; the discipline is in saying no when somebody wants to add "just one more" Airflow DAG to the imperative side because it's faster this week. Phase 2 — Convert the painful ones Pick the existing pipelines that hurt the most — the ones with the longest backfill stories, the most ad-hoc quality checks, the worst lineage gaps. Those are the ones where the declarative model pays for the rewrite cost fastest. Don't start with the easy ones; their owners won't thank you for the disruption. Phase 3 — Retire the orchestration boilerplate Once a critical mass of pipelines has moved over, you can shrink (or in many cases delete) Airflow setups, custom dependency-tracking tools, and the side projects that grew up around imperative ETL. This is the phase where the cost savings actually show up in headcount and infrastructure bills. Migration step Effort Watch out for New pipelines on Lakeflow Low Team momentum — easy to revert to old patterns. Convert the top 3 painful pipelines Medium Different streaming/batch semantics in expressed dependencies. Move expectations off external DQ tools Medium Existing alerting wired to the old framework. Retire imperative orchestrator High External callers (BI tools, ML jobs) that triggered DAGs directly. 7. Where Declarative Still Hurts: Honest Limitations I'd be lying if I said this was free. The places where the declarative model still bites: Procedural logic doesn't fit. If your "pipeline" is really a sequence of API calls with branching error handling, that's a Lakeflow Job (or external code), not a declarative table.Cross-pipeline orchestration is its own thing. Lakeflow Pipelines builds the DAG inside a pipeline. If you need pipeline A to wait for pipeline B, you still need Lakeflow Jobs above them.Debugging shifts from steps to definitions. When something is wrong, you're not stepping through a script — you're reading the event log and figuring out which expectation or upstream table caused it. The tooling is good; the muscle memory is different.Cost can surprise you. Auto-scaling on a misbehaving streaming source has the same risk it always has. Set max workers thoughtfully on day one; don't leave it to defaults. Conclusion The shift to declarative pipelines isn't really about syntax. It's about who owns the boring parts. In an imperative pipeline, the team owns the order, the retries, the lineage, the quality checks, and the cluster scaling — and pays in headcount when any of those break. In a declarative pipeline, those become properties of the engine, and the team owns the part that's actually interesting: the table definitions and the business logic. Lakeflow is the cleanest implementation of that idea I've used in production, and the teams I've watched migrate haven't asked to go back.

By Seshendranath Balla Venkata
Rust-Native Alternatives to Spark SQL and DataFrame Workloads
Rust-Native Alternatives to Spark SQL and DataFrame Workloads

Apache Spark is one of the most powerful tools in the data and AI engineering world. It helps process massive datasets and is widely used across industries, irrespective of cloud platforms. But when you move from learning Spark to running it in production, you start seeing real challenges. This is from practical experience. 1. JVM Overhead Spark runs on the Java Virtual Machine (JVM). At first, this looks fine. But in real workloads, it creates overhead. What actually happens: Extra memory is consumed by the JVM itselfData moves between Python and JVM (serialization)Job startup takes more time Why it matters: Even if your logic is simple, the JVM layer adds hidden cost and latency. Especially in PySpark workloads, this becomes very noticeable. 2. Garbage Collection (GC) Issues The JVM uses garbage collection (GC) to manage memory. In small workloads, no problem. In large workloads, big problem. What we generally observe: Sudden pauses during execution, Jobs becoming slow without a clear reason, and performance behaving inconsistently. Real Challenge We often need to tune: memory settings, GC configuration, and executor behavior. Without proper tuning, performance becomes unpredictable. 3. Cluster Complexity Spark is not just a tool — it is a distributed system. To run it, you must manage infrastructure. What we need to handle: Cluster setup, executors and memory configuration, partition tuning, scaling (up/down). Impact in real projects: Higher infrastructure cost, more operational effort, requires deep expertise, and this adds overhead beyond just writing data pipelines. Rust Changes Everything Rust solves these problems at the language level. No JVM Rust compiles directly to machine code. So, no virtual machine and no runtime overhead. No Garbage Collection Rust uses ownership-based memory management. Memory is handled at compile time No runtime GC pauses Predictable Performance Better memory control, no hidden pauses, Efficient execution Result: Faster and more stable systems When we look at Rust tools, we see different ways: Replace Parts of Spark PolarsDataFrame processingDataFusionSQL engineBallistaDistributed executionRisingWaveStreamingSailFullSpark replacement Lakesail has came up with all together at once place. What Is Sail? Sail is an open-source computation framework that serves as a drop-in replacement for Apache Spark (SQL and DataFrame API) in both single-host and distributed settings. Built in Rust, Sail runs ~4x faster than Spark while reducing hardware costs by 94%. In simple terms: Sail = Spark experience + Rust performance + no JVM/GC problems It is not just a library. It is a full data platform / compute engine. Core Idea of Sail Traditional Spark: Plain Text PySpark → JVM → Spark Engine → Execution Sail: Plain Text PySpark → Spark Connect → Sail (Rust Engine) → Execution Key difference: Spark depends on JVMSail removes the JVM completely Where Sail Is Strong Sail is a good choice if you are already using Apache Spark and want better performance.It allows you to continue using the same Spark SQL and DataFrame APIs without rewriting your code.It removes JVM and garbage collection overhead, which helps improve speed and memory usage.Because it runs on a Rust-native engine, it provides more stable and predictable performance.It can help reduce infrastructure cost while keeping your existing development approach. Where You Should Be Careful Sail is still a new technology and not as mature as the Spark ecosystem.The number of connectors, integrations, and community support is smaller compared to Spark.Some advanced Spark features may not be fully supported yet.It is important to test Sail with your own workload before using it in production. Sail supports almost all modern platforms' emerging features: Local mode (single machine)Cluster mode (Kubernetes) It includes: Task schedulingResource managementDistributed execution Similar to a Spark cluster, but lighter Lakehouse Support Sail supports: Delta LakeApache Iceberg That means: Works with modern data lakesCompatible with existing data Storage Support Sail can read/write from: AWS S3Azure Data LakeGoogle Cloud StorageHDFSLocal files So, it integrates with existing ecosystems Catalog Integration Supports: Unity CatalogIceberg REST Catalog Important for: GovernanceAccess controlEnterprise data management Multimodal + AI Workloads Sail goes beyond Spark. It supports: Structured dataImagesPDFsAI workloads This is called: Multimodal lakehouse. Performance and Cost Sail claims: ~4x faster executionUp to 8x in some workloads~94% lower cost Reasons: No JVM overheadNo GCBetter memory usage Conclusion Sail is a new way to run Spark workloads using Rust instead of the JVM. It removes garbage collection and reduces memory and performance issues, making execution faster and more stable. One of its biggest advantages is that you can keep the same Spark code with little or no changes. This helps reduce infrastructure cost and complexity. However, it is still a new technology and not as mature as Spark yet. In the future, the best approach will be to use the right mix of Spark and Rust tools together.

By Srinivasarao Rayankula
Combining Temporal and Kafka for Resilient Distributed Systems
Combining Temporal and Kafka for Resilient Distributed Systems

Kafka and Temporal address different failure boundaries, and resilient distributed systems often need both rather than one as a substitute for the other. Kafka is built to move ordered, replayable event streams across many consumers and machines, while Temporal is built to keep long-running application logic alive as durable Workflow Executions that recover from crashes, outages, and worker restarts by replaying persisted Event History. The combination becomes compelling when Kafka is used to carry facts and Temporal is used to remember intent, timers, retries, and compensations across the lifetime of a business process. Kafka as the Event Backbone and Temporal as the Control Plane Kafka’s model is centered on totally ordered partitions, consumer groups, and offsets. A partition is consumed by exactly one consumer in a subscribing consumer group at a time, and Kafka keeps consumer state compact by treating progress as an offset that can be checkpointed, committed manually, or even rewound for reprocessing. That model is excellent for integration boundaries, stream processing, and decoupling producers from downstream services. What it does not provide by itself is durable orchestration for business logic that must wait for hours, react to multiple messages over time, and recover mid-process without rebuilding state externally. Temporal fills that gap by treating a Workflow Execution as a durable, reliable, scalable function that owns local state, receives messages through Signals or Updates, and advances by replaying persisted history instead of starting over from scratch after failure. Keep Kafka at the Boundary of Workflow Replay The most important design rule is simple: Kafka client calls do not belong inside Workflow code. Temporal requires deterministic workflow logic on replay, and its documentation explicitly places non-deterministic work, such as API calls and database queries, inside Activities. A Workflow should behave like a compact state machine that decides what should happen next, while Activities perform the side effects that may fail or need retries. That separation is what allows Kafka to remain an external event fabric without corrupting Temporal replay semantics. Java private boolean paymentReceived; private final OrderActivities activities = Workflow.newActivityStub( OrderActivities.class, ActivityOptions.newBuilder() .setStartToCloseTimeout(Duration.ofSeconds(30)) .setRetryOptions( RetryOptions.newBuilder() .setInitialInterval(Duration.ofSeconds(1)) .setMaximumInterval(Duration.ofSeconds(30)) .build()) .build()); @WorkflowMethod public void process(String orderId) { activities.reserveInventory(orderId); boolean paid = Workflow.await(Duration.ofHours(2), () -> paymentReceived); if (!paid) { activities.releaseInventory(orderId); activities.publishTimedOut(orderId); return; } activities.publishConfirmed(orderId); } @SignalMethod public void paymentCaptured(String paymentId) { paymentReceived = true; } This workflow is intentionally boring, which is precisely why it is robust. Inventory reservation and event publication are pushed into Activities, while the workflow itself only keeps state and waits. The two-hour wait is not a sleeping thread in application memory; Temporal persists timers so the execution resumes even after worker or service interruptions. Kafka, in this pattern, supplies the external payment event, but Temporal owns the long-lived timeout and the recovery semantics. A thin Kafka bridge can then translate an incoming record into a Temporal message instead of embedding orchestration logic in the consumer loop. Signal-With-Start is especially useful because it either signals an existing workflow or starts a new one with the same Workflow ID and immediately applies the signal, which removes a large class of race conditions between creation and update. Java public void onMessage(ConsumerRecord<String, PaymentEvent> record) { WorkflowStub workflow = client.newUntypedWorkflowStub( "OrderWorkflow", WorkflowOptions.newBuilder() .setWorkflowId("order-" + record.key()) .setTaskQueue("order-workflows") .build()); workflow.signalWithStart( "paymentCaptured", new Object[] { record.value().paymentId() }, new Object[] { record.key() }); consumer.commitSync(); } That handoff should be designed as duplicate-tolerant rather than duplicate-impossible. Kafka allows manual control over when a record is considered consumed, but a crash after Temporal accepts the signal and before the offset is committed can still trigger redelivery. A practical way to make that safe is to keep the Workflow ID stable for the business entity and to make Activities idempotent, because Temporal may retry Activity executions as part of normal failure handling. Failure Semantics Matter More Than Labels The most common architectural mistake in Kafka and Temporal systems is to over-claim exactly-once semantics. Kafka’s idempotent producer ensures that retries do not create duplicate writes in the stream, and Kafka transactions allow atomic writes across partitions and topics. Kafka Streams goes further by defining end-to-end exactly-once around a very specific boundary: input topic offsets, state stores, and output topics are committed atomically because they are all inside Kafka’s storage model. Temporal, meanwhile, gives an effectively once-scheduled experience for Activities, but still expects Activity implementations to be idempotent because retries can occur after partial execution or worker failure. The combined system, therefore, does not become end-to-end exactly-once by default; that only happens when idempotency keys or transactional guarantees explicitly cover every external side effect that matters. Java public void publishConfirmed(String orderId) { producer.beginTransaction(); try { producer.send(new ProducerRecord<>("order-confirmed", orderId, orderId)).get(); producer.commitTransaction(); } catch (Exception ex) { producer.abortTransaction(); throw ex; } } This kind of publishing Activity is useful when Workflow progress must result in one or more Kafka records that either all appear or all fail together. The producer should be configured for idempotence, durable acknowledgments, and a transactional.id, but the design should still assume that non-Kafka side effects may need compensation. Temporal’s error-handling guidance recommends rollback logic with the Saga pattern for multi-step processes, which maps naturally to workflows that can reserve inventory, attempt payment, publish status, and then compensate in reverse order if one boundary fails after another has already succeeded. Long-Running Streams Need Long-Running Discipline Once Kafka is feeding entity-centric workflows for days or weeks, operational details start to matter as much as API design. Reusing the same business key as the Kafka record key and the Temporal Workflow ID creates a clean ownership model: Kafka uses keys to select partitions, partitions remain totally ordered, and Temporal guarantees that only one Workflow Execution with a given ID is open at a time. That alignment naturally serializes updates for a customer, order, or account across both systems. At the same time, the Kafka side of the bridge should stay thin enough to keep polling regularly, because consumers that stop polling can be considered dead and rebalanced out of the group. Temporal workflows that receive large numbers of Signals or perform many Activity calls also need history management. Event History is the mechanism that makes recovery possible, but it has performance limits and hard ceilings; Temporal warns as history grows and recommends Continue-As-New for long-running executions or workloads that process thousands of events. That becomes especially important in Kafka-driven entity workflows, where a single logical process can become a permanent mailbox unless it periodically rolls forward into a fresh run. Code evolution must also be handled deliberately because workflow logic is replayed; Temporal’s versioning guidance requires patching or worker versioning when changes would otherwise introduce non-determinism for in-flight executions. Conclusion Temporal and Kafka work best together when each is allowed to solve the problem it was built for. Kafka should distribute ordered, replayable events across the system boundary, and Temporal should hold the durable state machine that decides what those events mean over time. With that separation, retries stop leaking into application code, timers stop depending on process uptime, and compensations stop turning into chains of callbacks and ad hoc status flags. The result is not merely a system that survives failures, but a system whose failure semantics remain understandable under load, redelivery, redeployments, and long-running business latency.

By Akhil Madineni
The Big Data Architecture Blueprint: Core Storage, Integration, and Governance Patterns
The Big Data Architecture Blueprint: Core Storage, Integration, and Governance Patterns

Building scalable data systems often feels like navigating an endless sea of shifting paradigms. Engineers and architects are constantly forced to choose between centralizing data or distributing it, processing in batches or streaming in real time, and enforcing strict compliance or enabling rapid self-service analytics. Without a structured taxonomy, engineering teams risk building fragmented pipelines that accumulate technical debt. The following comprehensive blueprint serves as a definitive Data Patterns and Practices Library to help you align your infrastructure with proven engineering methodologies. Architectural Patterns Data lake: A centralized repository that allows storing structured and unstructured data at any scale, enabling raw data storage for various analytics purposes.Data warehouse: A large, centralized repository for storing and managing structured data, optimized for high-performance analytics and reporting.Lambda architecture: A data processing architecture that combines batch and stream processing for fault-tolerant, scalable, and real-time data analytics.Kappa architecture: A data processing architecture that simplifies Lambda Architecture by only using stream processing for both real-time and historical data.Microservices architecture: A design approach that structures applications as a collection of small, independently deployable services, allowing for greater flexibility and scalability.Event-driven architecture: A software design pattern that promotes the production, detection, and reaction to events, enabling loose coupling and high scalability in distributed systems.Polyglot persistence architecture: A data storage strategy that uses multiple types of databases to store and manage data according to its specific needs.Data mesh: A decentralized approach to data architecture focusing on domain-oriented data ownership, self-serve data infrastructure, and product-oriented data delivery.Data vault: A hybrid data modeling and storage methodology that combines aspects of 3NF and star schema to create a scalable, flexible, and auditable solution.Streaming-first: An approach that prioritizes real-time data processing and analysis utilizing event streaming technologies. Storage Patterns Sharding: A method of distributing data across multiple database servers to improve performance and scalability.Partitioning: The process of dividing a large table into smaller, more manageable pieces to improve query performance.Replication: The process of copying data from one database to another to ensure availability, redundancy, and load balancing.Federated storage: A storage architecture that integrates multiple storage systems under a unified management framework.Object storage: A scalable architecture that manages data as objects rather than files or blocks, providing high performance for unstructured data.Columnar storage: A format that stores data by column rather than row, which is particularly suited for analytics workloads.Time-series: A specialized storage system designed to handle time-stamped data, such as sensor data or stock prices, efficiently.Graph storage: A system optimized for storing and querying graph data, representing entities and their relationships in an interconnected structure.In-memory storage: A storage architecture that stores data in RAM instead of on disk for significantly faster processing.Hybrid storage: A solution that combines different storage types, such as on-premises and cloud, to optimize cost and performance. Integration Patterns Extract, transform, load (ETL): A process of extracting data from source systems, transforming it, and loading it into a target system.Extract, load, transform (ELT): A variation of ETL where data is first loaded into the target system and then transformed using the target's processing power.Change data capture (CDC): A technique for capturing and processing changes in source data to enable incremental updates to target systems.Data federation: A technique for integrating data from disparate sources without physically moving or copying it, providing a unified view.Data visualization: An approach that abstracts underlying data sources, allowing users to access and manipulate data without knowing its physical location.Data replication: The process of copying data from one database to another to ensure data availability and redundancy.Data synchronization: The process of keeping data in multiple locations consistent and up-to-date by propagating changes.Data preparation: The process of cleaning, transforming, and enriching data to make it suitable for analysis or processing.Publish/subscribe: A messaging pattern that decouples data producers and consumers using an intermediary message broker.Request/reply pattern: A messaging pattern where a data consumer sends a request and waits for a response, allowing for synchronous communication. Data Analytics Descriptive analytics: The analysis of historical data to understand past events and trends, often presented through reports or dashboards.Diagnostic analytics: The process of examining data to determine the causes of past events using techniques like data mining or correlations.Predictive analytics: The use of data, statistical algorithms, and machine learning to predict future events based on historical data.Prescriptive analytics: The process of recommending actions or decisions based on data analysis using optimization or simulation algorithms.Real-time analytics: The analysis of data as it is generated or received to provide immediate insights and rapid decision-making.Batch analytics: The processing and analysis of large volumes of data in batches, often scheduled at regular intervals.Text analytics: The process of extracting meaningful information from unstructured text using natural language processing.Geospatial analytics: The analysis of geographically referenced data to interpret spatial relationships and patterns.Sentiment analytics: A technique using NLP to determine the sentiment or emotion expressed in textual data.Network analytics: The analysis of network data to uncover patterns and interactions between nodes (entities) in a network. Data Management Master data management (MDM): The process of creating a single, authoritative source of truth for critical business data.Reference data management (RDM): The practice of managing shared data (like codes or categories) used across multiple systems for consistency.Metadata management: The process of creating and maintaining data about data to facilitate discovery and governance.Data catalog: A searchable inventory of an organization's data assets, including datasets and reports.Data lineage: The practice of tracking the flow of data through systems, including its origin and transformations.Data versioning: The process of tracking and managing changes to data over time for recovery and auditing.Data performance: The process of documenting the origin, history, and processing of data to ensure trustworthiness and traceability.Data lifecycle management: A comprehensive approach to managing data from creation to archival or deletion.Data virtualization: A technique that abstracts underlying data sources to allow access without knowledge of physical location or structure.Data profiling: The process of assessing data quality by collecting statistics and identifying patterns or anomalies. Data Governance Data stewardship: The practice of overseeing an organization's data to ensure quality, consistency, and compliance.Data quality management: The process of measuring and improving the accuracy, completeness, and consistency of data.Data policy management: The development and enforcement of standards and procedures that govern data use.Data classification: The process of categorizing data based on sensitivity or risk to implement appropriate security measures.Data retention and archival: Defining policies for storing and disposing of data based on legal and business requirements.Data privacy compliance: Ensuring data practices adhere to laws and regulations like GDPR or CCPA.Data lineage and provenance: Tracking the origin and flow of data through systems to ensure accuracy and compliance.Data cataloging and discovery: Maintaining a searchable repository that provides an inventory of an organization's data assets.Data risk management: Identifying and mitigating data-related risks such as breaches or corruption.Data ownership: Assigning accountability for data assets to specific individuals or teams to ensure proper management. Data Security Data encryption: Encoding data to protect it from unauthorized access both at-rest and in-transit.Data masking: Obscuring sensitive data by replacing it with fictitious data to prevent exposure to unauthorized users.Data tokenization: Substituting sensitive data with non-sensitive tokens while still enabling some operations and analytics.Data access control: Defining policies that determine who can access or modify data based on roles and security requirements.Data auditing: Monitoring and recording data activities to detect unauthorized access or compliance violations.Data anonymization: Removing personally identifiable information (PII) from datasets to protect individual privacy.Data pseudonymization: Replacing sensitive data with artificial identifiers to reduce re-identification risk.Data security monitoring: Continuously analyzing systems and networks for potential security threats or breaches.Data activity monitoring: Continuous analysis of database transactions to detect unauthorized access or policy violations.Data loss prevention: Tools and practices designed to protect sensitive data from unauthorized leakage or theft. Key Use Cases and Architectural Examples 1. Real-Time Distributed Processing for High-Velocity Streams For platforms requiring immediate analytical insights, minimizing architectural complexity while handling large-scale data streams is a primary challenge. Core patterns: Kappa Architecture, Streaming-first, and In-Memory Storage. Production tech stack: Apache Kafka, PySpark, Structured Streaming, and Redis. Specific example: In a high-volume financial transaction system, implementing a Kappa Architecture simplifies the processing pipeline by routing both real-time logs and historical data events through a single stream engine. By prioritizing a streaming-first approach using an Apache Kafka cluster, the platform eliminates the complex dual-pipeline maintenance found in traditional Lambda setups. A PySpark Structured Streaming application consumes these event streams directly, executing stateful window transformations on the fly. To achieve microsecond latency for immediate fraud lookups, the working state or frequently queried reference tables are held in an In-Memory Storage layer like Redis, ensuring rapid access speeds that disk-based alternatives cannot match. 2. Decentralized Architecture for Enterprise Scaling Large organizations often face engineering bottlenecks when a single, centralized team manages a massive monolithic data lake. Core patterns: Data Mesh, Data Governance, and Data Cataloging and Discovery. Production tech stack: Databricks Unity Catalog, AWS Lake Formation, and Snowflake Data Sharing. Specific example: A multi-national banking entity transitions to a Data Mesh framework, shifting data asset ownership away from a centralized team to domain-oriented groups, such as Risk Modeling and Retail Analytics, which deliver data as independent products. To maintain unified compliance, the infrastructure relies on strict Data Governance policies managed through Databricks Unity Catalog and AWS Lake Formation, enforcing centralized data stewardship, role-based access control, and automated data classification. These localized datasets are then securely exposed across departments via Snowflake Data Sharing. A centralized Data Catalog runs continuously on top of these endpoints, providing developers across the entire enterprise a single, searchable inventory to securely discover, audit, and consume cross-domain data products. 3. High-Performance Cloud Analytics and Reporting To optimize modern cloud infrastructure, data pipelines must maximize query performance while containing compute and storage costs. Core patterns: Extract, Load, Transform (ELT) and Columnar Storage. Production tech stack: dbt (Data Build Tool), Delta Lake, Snowflake, and Apache Spark. Specific example: A modern enterprise analytics platform ingests massive volumes of raw operational data into cloud object storage, choosing a flexible ELT pipeline over traditional ETL frameworks. Raw files are loaded directly into a target data platform like Snowflake or Databricks Delta Lake, leveraging cloud elasticity to execute complex transformations post-load using dbt or optimized Spark SQL queries. To maximize business intelligence performance, the underlying files are stored using highly optimized Columnar Storage formats like Parquet. This structures data by column rather than row, ensuring that analytical queries only read the specific columns requested for a report. This optimization cuts down disk I/O operations and speeds up complex calculations across billions of historical records. Conclusion Successfully implementing a modern data infrastructure is never about finding a single pattern to solve every corporate challenge. True architectural maturity lies in knowing how to weave these paradigms together. By mapping tactical storage choices directly to overarching governance and integration frameworks, software architects can build resilient environments capable of evolving alongside business demands. Which of these three architectural focus areas aligns best with your specific narrative or current production environment? Let me know in the comments below.

By Ram Ghadiyaram DZone Core CORE
Is the Data Warehouse Dead? 3 Patterns From Enterprise Architecture That Answer This Question
Is the Data Warehouse Dead? 3 Patterns From Enterprise Architecture That Answer This Question

Architectural Debate There is a classic debate that data architects often have among themselves: how to fit a traditional data warehouse on a data lake or enterprise data platform. This article walks through the architecture evolution and describes three architecture patterns that I have implemented across enterprises to help you decide where a data warehouse fits in a modern data platform. The data warehouse acted as a single source of truth that finance, retail, and operations teams could trust for day-to-day reporting. Appliance warehouses like Teradata, Netezza, and SybaseIQ dominated enterprise data for decades, and SQL was the universal language that held it all together. Then two things happened simultaneously. Data volumes outgrew what any single warehouse could handle, and cloud storage made storing everything cheaper. This created a genuine architectural dilemma that most organizations have not resolved cleanly (yet). The most common mistake I have seen is the “one size fits all” approach, where workloads are mixed together without considering the purpose or usage, based on people and “rigid” processes. This creates a cost overhead over time and significantly limits the ability to get real value from the data. After implementing data platforms at companies, including a large US beverage manufacturer, a global media company, and as part of the AWS data architecture practices, I have seen three distinct patterns emerge. Each has a legitimate use case. Each has a failure mode that is entirely predictable, and yet organizations keep hitting it. Context: What the Warehouse Was Actually Good At Before evaluating patterns, it helps to be precise about what warehouses solved: Transformations. CDC from transactional sources, slowly changing dimensions(SCD), and fact aggregation. These are the use cases that SQL warehouses solved reliably, either with SQL and/or by ETL tooling.Reporting. Pre-computed, governed, low-latency access for drill-down/roll-up, and summarization. Data lakes historically struggled with both. ACID compliance was unreliable, and complex transformations required significant engineering. Open table formats like Apache Iceberg, Apache Hudi, and Delta Lake changed this equation by bringing warehouse comparable reliability to object storage. That shift is what makes the three patterns below possible. Pattern 1: Data Warehouse as the Enterprise Data Platform Best for: BI-heavy organizations, mature SQL teams Data flow: I have observed this pattern repeatedly in organizations that have a bigger BI reporting base, where canned reports, dashboards, and self-service BI dominate business requirements even today. When an engineer should choose this: Your organization has a purely BI-driven workload today, your team is SQL-native, and ML, streaming, or unstructured data requirements do not exist currently. Where it breaks: The core problem is not what this pattern does; it is what it cannot do when analytical needs evolve. And in my experience, they always do. Specific failure modes engineers hit at scale: Transformation pressure on one engine. Ingestion, transformation, and consumption all compete for warehouse compute. At high data volumes, this creates a workload management overhead or requires additional cost to separate out readers and writers.Vertical scaling becomes the only option. When the warehouse chokes under volume, the operations team scales up the cluster or splits reader/writer nodes by business unit. Both options add cost without adding architectural flexibility.Object Storage is a dump, not an asset. Raw files land in object storage but are never cataloged, versioned, or governed. Schema evolution and data lineage take significant engineering effort. No distributed compute path. ML model training, log analytics, clickstream analysis, and unstructured data have no home in this architecture. Re-architecture is expensive when needs change. Moving from this requires rearchitecting, often ending in the purchase of a costly SQL-driven SaaS solution because it was not built correctly. Summary: Pattern 1 works cleanly for what it is designed for. The problem is that the design is not futuristic and significantly limits the value that data can generate. Pattern 2: No Warehouse Scenario Best for: Engineering-led teams, ad hoc analytics, big data exploration Data flow: No data warehouse in the stack, instead a serverless query engine provides SQL abstraction directly over object storage. This pattern originated with Presto and Impala for big data analytics and worked well in that context. When an engineer should choose this: Your workload is dominated by ad hoc analytical queries, ML feature exploration, or log analytics. Where it breaks: This pattern struggles when it gets applied to BI workloads. The specific challenges engineers hit: Query performance is tricky. Without careful partitioning and file size optimization on the data, a dashboard refresh at 9am (cold start) may trigger a full scan of terabytes of data, causing unpredictable BI performance.Cost is unpredictable. These engines charge per TB scanned, not per result row. Forgetting a date filter on a large table scan can cost thousands of dollars in bills. Warehouse compute costs are predictable; object-storage query costs are not.Concurrency challenges. Warehouses have mature workload management techniques that can do queuing, prioritization, and resource allocation. Serverless engines yet to reach that maturity. So concurrent dashboard users face timeouts and failures, not graceful queuing.Result-set caching helps for identical repeated queries, but any variation in filter parameters triggers a full storage scan. Dynamic dashboard filters with user-specific slicers make this caching largely ineffective for BI use cases.Data lake schemas evolve frequently. Cloud catalogs like Glue or Unity Catalog track changes, but downstream BI dashboards break silently when columns shift. A warehouse enforces the contract that protects reports. Summary: Pattern 2 works perfectly for the scenarios that require distributed processing at scale. The problem is that the design is not built for low-latency use cases. Pattern 3: Purposeful Hybrid Best for: Mixed workloads, enterprise scale, cost-conscious organizations with a variety of workloads, adaptable, futuristic needs Data flow: This is the pattern in my experience that acts as a best of both worlds scenario, because it requires deliberate data segmentation rather than a rigid architectural decision. This has the ability to route each workload to the compute/storage that is purpose-built for it. ML models consuming ten years of data should not compete with operational dashboards needing sub-second response on last quarter's inventory. Object storage is cheap and scalable horizontally for everything, whereas a warehouse holds only what requires low-latency relational access. Implementation on the warehouse side: Instead of loading the full silver and gold layers into the warehouse, load only the date range business users need for low-latency reporting. In retail, inventory movement rarely needs more than a rolling year in the warehouse. In travel and hospitality, promotional rate performance reports typically span a few months. A schedule query copies the relevant date-partitioned data from the data lake into the warehouse, and purges expired ranges from the warehouse once they age out. The source data remains in object storage permanently. This single mechanism drastically reduces warehouse compute and storage costs while keeping BI response times fast. Handling historical range queries: The common objection is: what about reports that need five years of history? The right engineering question is not "how do we make that fast?" — it is "does that query need millisecond latency?" A five-year inventory trend analysis requested once a quarter may not need a millisecond response time. This is the exact use case for federated queries. Redshift Spectrum, Athena, and Synapse Serverless allow external tables to be defined over S3/ADLS data, queryable alongside physical warehouse tables with standard SQL joins. A retail analyst querying this week's inventory from Redshift joined against three years of history in S3 without moving a single byte SQL -- Example: federated query joining warehouse (hot) + lake (cold) SELECT w.sku, w.inventory_count, h.avg_inventory_12m FROM warehouse.inventory_current w JOIN external_schema.inventory_history h -- data lives in S3 ON w.sku = h.sku WHERE w.report_date = CURRENT_DATE Decision Framework The following table helps to decide which pattern is most suitable for their need Pattern 1Pattern 2Pattern 3 BI performance High Unpredictable High ML / unstructured None Native Native Cost at scale Expensive Unpredictable Optimized Governance Low Strong Strong Scalability Vertical Horizontal Horizontal Implementation complexity Low Medium High Future-readiness Low Medium High Team skill required SQL Spark/Distributed computing Both The mistake is not choosing the wrong pattern; it is applying one pattern to all workloads. Every large enterprise I have worked with has a mix of workload types that no single architecture serves equally well. situationrecommended pattern >70% BI and reporting workload Pattern 1 or 3 Heavy ML, log, or unstructured data Pattern 2 or 3 Petabyte scale with cost pressure Pattern 3 Small to mid-size, SQL-native team Pattern 1 Ad hoc analytics dominant Pattern 2

By Nabarun Bandyopadhyay
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch

Threat intelligence becomes operationally valuable when indicator data can be collected continuously, normalized into a consistent schema, and queried fast enough to support enrichment and detection workflows. Standardized exchange formats such as STIX and transport protocols such as TAXII exist specifically to make machine-readable cyber threat intelligence easier to distribute at scale, while preserving enough structure for downstream correlation and context. Operational Requirements That Shape Intelligence Pipelines A threat intelligence pipeline is best treated as data engineering with security-specific constraints: provenance must remain intact, updates and revocations must be representable, and “freshness” should be measurable rather than assumed. STIX is explicitly designed to model cyber threat intelligence using typed objects with attributes, and it supports building richer context by linking objects through relationships rather than shipping flat indicator lists. A practical pipeline design often separates raw ingestion from normalized storage. Raw ingestion preserves the original feed payload for auditability and reversibility, while normalized storage produces documents that are easy to match against telemetry. This split aligns with STIX’s modeling approach, where producers may publish Indicators expressed as STIX patterns and connect them to other objects through relationship constructs, enabling consumers to choose between lightweight atom extraction for matching and graph-style context for analysis. Pulling From TAXII and Other APIs Without Losing Provenance TAXII 2.1, published by OASIS Open, defines a RESTful API and related requirements for TAXII clients and servers to exchange cyber threat information in a scalable manner, with STIX 2.1 support described as mandatory to implement in the TAXII context. The IANA media type registration for application/taxii+json also documents that the older application/vnd.oasis.taxii+json name is a deprecated alias, which matters in real integrations because content negotiation and strict header validation vary by server implementation. TAXII 2.1 also formalized mechanics that directly affect pipeline correctness under load. The CTI documentation notes that TAXII 2.1 added limit and next URL parameters and updated content negotiation and media types, reflecting a move toward pagination patterns that can handle large or rapidly changing datasets more safely than item-based offset pagination. A Python pipeline can either implement paging logic directly or delegate it to a client library; the taxii2client project documents a TAXII 2.1 client API that uses application/taxii+json;version=2.1 for Accept handling and provides an as_pages helper for TAXII 2.1 endpoints that support pagination, including “Get Objects” and “Get Manifest.” Python def iter_taxii_objects(collection, cursor, page_size=2000): accept = "application/taxii+json;version=2.1" for page in as_pages(collection.get_objects, per_request=page_size, added_after=cursor, accept=accept): envelope = page if isinstance(page, dict) else page.json() for obj in envelope.get("objects", []): yield obj This pattern avoids embedding server-specific pagination tokens into pipeline logic while still enabling incremental collection reads. The cursor argument can be persisted as an ISO-8601 timestamp when the upstream provides a timestamp filter, a model commonly used by TAXII-feed vendors; for example, ESET documents STIX 2.1 feeds delivered via TAXII 2.1 collections and describes an added_after filter parameter for retrieving objects added after a specified timestamp, alongside retention constraints that make incremental pulls operationally necessary. Not all threat intelligence sources are TAXII-first. MISP Project documentation describes a REST-accessible STIX export capability and explicitly notes that STIX XML export can be slow and lead to timeouts with large events or collections, while STIX JSON avoids that regime, making JSON a more stable transport choice for high-volume automation. The same ecosystem provides a published OpenAPI specification and a dedicated converter library, misp-stix, which supports bidirectional conversion across STIX versions, including STIX 2.1, and includes features such as pattern parsing and indicator-observable fingerprinting, reducing the cost of maintaining bespoke mapping logic for every upstream source. Normalization Into ECS and STIX-Aware Semantics Normalization is where a pipeline either becomes queryable or becomes another archive. The Elastic Common Schema (ECS) threat field guidance explicitly frames threat.* as the mapping layer that normalizes threat intelligence indicators from many structures into consistent fields, and it links that normalization to detection and enrichment workflows such as indicator match rules. In particular, the guidance calls out normalizing indicators into threat.indicator.* so that disparate feeds can be queried consistently and used to build indicator matching logic without treating every provider as a special case. Atomic indicators benefit from being stored both as “typed value” and as vendor identifiers. ECS defines threat.indicator.type values aligned with cyber observable types and documents threat.indicator.id as a place to store indicator IDs, noting that a STIX 2.x indicator ID is a common approach and that the field can hold multiple values to represent the same indicator across systems. The practical implication is that a pipeline can preserve the upstream STIX identifier, attach a stable provider-local identifier when necessary, and still normalize the matchable indicator value into fields such as threat.indicator.ip or other threat.indicator.* subfields. Python def stix_confidence_to_nlmh(value): if value is None: return "Not Specified" v = int(value) if v == 0: return "None" if 1 <= v <= 29: return "Low" if 30 <= v <= 69: return "Medium" if 70 <= v <= 100: return "High" return "Not Specified" def extract_atomic_from_pattern(pattern): p = (pattern or "").strip() if "ipv4-addr:value" in p and "'" in p: return ("ipv4-addr", p.split("'")[1]) if "domain-name:value" in p and "'" in p: return ("domain-name", p.split("'")[1]) if "url:value" in p and "'" in p: return ("url", p.split("'")[1]) return (None, None) def stix_indicator_to_ecs(indicator_obj, provider, fetched_at_iso): itype, ivalue = extract_atomic_from_pattern(indicator_obj.get("pattern")) if not itype: return None doc = { "@timestamp": fetched_at_iso, "event": {"kind": "enrichment", "category": ["threat"], "type": ["indicator"]}, "threat": { "indicator": { "type": itype, "provider": provider, "name": indicator_obj.get("name") or ivalue, "description": indicator_obj.get("description"), "confidence": stix_confidence_to_nlmh(indicator_obj.get("confidence")), "reference": indicator_obj.get("id"), "id": [indicator_obj.get("id")], } }, "labels": {"feed": provider}, } if itype in {"ipv4-addr", "ipv6-addr"}: doc["threat"]["indicator"]["ip"] = ivalue return doc The extraction logic deliberately scopes itself to common “atomic” patterns to keep parsing deterministic and to minimize the risk of silently incorrect field derivation. This constraint matches the operational intent of ECS indicator guidance, which emphasizes consistent querying and reuse for indicator match rules after normalization, rather than attempting to fully interpret every possible composite STIX pattern in real time. Indexing Strategy in Elasticsearch That Avoids Accidental Cost Explosion Elasticsearch storage decisions are not purely operational preferences because they alter what update patterns are safe. Data streams consist of one or more hidden backing indices and require a matching index template; every document indexed into a data stream must include an @timestamp field mapped as a date-type (or date_nanos). Data streams are described as a good fit for most time-series use cases, while the documentation explicitly flags that frequent reuse of the same _id expecting last-write-wins can indicate a better fit for an index alias with a write index rather than a data stream. Threat intelligence pipelines often straddle that boundary: indicator state changes and revocations benefit from upsert semantics, while ingestion audits benefit from append-only history. Retention should be tied to query strategy. Elastic Security documentation warns that indicator match rules can consume significant resources and recommends limiting the indicator index query time range to the minimum necessary for coverage, with a default example query of the past 30 days. Even outside an alerting engine, a time-bounded indicator set tends to be operationally safer: it reduces scan cost, makes cache behavior more predictable, and avoids matching against long-expired infrastructure that is no longer relevant. When vendor retention is narrower, such as the 14-day retention window described for some TAXII feeds, the pipeline should persist that constraint as a policy and avoid relying on “full historical replay” as a recovery mechanism. Ingestion-Time Guardrails With Python, Ingest Pipelines, and Bulk Writes Ingest pipelines provide an explicit place to enforce normalization rules at ingest time. Elastic documentation describes ingest pipelines as a sequence of processors that run sequentially to transform data before it is indexed into a data stream or index, supporting operations such as removal, extraction, and enrichment. In addition, ingest processors can access ingest metadata under the _ingest key, and Elasticsearch notes that pipelines create _ingest.timestamp by default and that indexing ingest metadata requires explicitly setting it via a processor. JSON PUT /_ingest/pipeline/ti_normalize { "description": "Normalize threat intel indicators into ECS threat.indicator.*", "processors": [ { "set": { "field": "event.kind", "value": "enrichment" } }, { "set": { "field": "event.category", "value": ["threat"] } }, { "set": { "field": "event.type", "value": ["indicator"] } }, { "set": { "field": "event.ingested", "value": "{{{_ingest.timestamp}}" } }, { "fingerprint": { "fields": ["threat.indicator.provider", "threat.indicator.type", "threat.indicator.ip"], "target_field": "threat.indicator.fingerprint", "method": "SHA-256", "ignore_missing": true } } ] } Bulk ingestion should align with Elasticsearch’s wire format rules. The bulk API documentation describes NDJSON requirements, including that the final line must end with a newline character and that JSON actions and sources should not be pretty printed because newlines are literal delimiters. A Python producer can serialize documents into bulk batches, assign a deterministic _id derived from provider and atomic indicator value to make writes idempotent, and optionally route documents through the normalization pipeline configured above. Python def build_indicator_id(provider, itype, ivalue): return (provider + ":" + itype + ":" + ivalue).lower() def bulk_index_indicators(es_http, index_name, docs): lines = [] for d in docs: ti = d.get("threat", {}).get("indicator", {}) doc_id = build_indicator_id(ti.get("provider", "unknown"), ti.get("type", "unknown"), ti.get("ip", ti.get("name", "unknown"))) lines.append(encode_json({"index": {"_index": index_name, "_id": doc_id, "pipeline": "ti_normalize"})) lines.append(encode_json(d)) payload = "\n".join(lines) + "\n" return es_http.post("/_bulk", body=payload, headers={"Content-Type": "application/x-ndjson"}) The NDJSON newline termination is not optional, so building the payload in a way that always emits a trailing newline avoids a class of partial-ingest failures that are hard to diagnose under load. For enrichment use cases, ingest-time join behavior should be applied cautiously: Elastic warns that the enrich processor can impact ingest speed, recommends benchmarking, and explicitly states that it is not recommended for appending real-time data, instead working best with reference data that does not change frequently. This guidance aligns with threat intelligence practice: fast-changing indicators typically work better as a queried dataset, joined at search or detection time, rather than as an ingest-time enrichment applied to every event. Conclusion A threat intelligence pipeline built on Python, APIs, and Elasticsearch becomes reliable when it treats schemas, media types, and update semantics as core engineering concerns instead of integration details. STIX and TAXII provide standard object modeling and transport expectations, including content negotiation and pagination mechanics, while ECS provides a target schema that makes indicators consistently queryable and directly usable by matching workflows such as indicator match rules. High-quality implementations preserve provenance, normalize into threat.indicator.* with STIX-aligned confidence semantics, choose an indexing strategy that matches expected update patterns, and enforce ingestion guardrails through ingest pipelines, simulation, and NDJSON-correct bulk writes.

By Krishnaveni Musku
Optimizing Databricks Spark Pipelines Using Declarative Patterns
Optimizing Databricks Spark Pipelines Using Declarative Patterns

If you've ever inherited a Spark job that runs in 35 minutes and someone asks you to make it faster, you know the routine. You start by checking partition counts, then file sizes, then shuffle stages, then broadcast hints. You find a handwritten OPTIMIZE schedule from 2022, a Z-ORDER on the wrong column, and a cluster sized for last year's data volume. By the time you've made the job fast, you've absorbed three new things to maintain. The next person to inherit it will absorb four. This pattern — call it the hand-tuning treadmill — is what the declarative optimization story on Databricks is trying to break. It's not a single feature; it's a cluster of capabilities that collectively let teams describe what a table should look like and let the engine handle the physical optimizations. What follows is the practical view of those patterns: where they fit, what they replace, and how to migrate without a rewrite weekend. 1. The Hand-Tuning Treadmill: Why Imperative Optimization Doesn't Scale Before getting into the declarative side, it's worth being concrete about what "imperative Spark optimization" actually means in production. The shape is consistent across teams I've audited: Layout decisions frozen on day one. Somebody picks a partition column when the table is created. The data shape changes a year later. Nobody re-partitions because the migration is scary. Query plans drift toward full scans.Maintenance jobs that nobody owns. An OPTIMIZE / Z-ORDER / VACUUM script lives in a notebook scheduled at 3 AM. It runs on a cluster that's slightly mis-sized. When data volume grows, the job runs into the morning workload, and people complain about latency.Cluster sizing as a guess. Worker count is a heuristic from a senior engineer's memory of last year's spike. Half the time it's too big, half the time it's too small, and the cost discussion gets emotional.Hint-driven plans. Broadcast hints, repartition hints, coalesce (N) — sprinkled through pipelines to fix yesterday's problem, kept indefinitely because removing them feels risky. None of these are bugs. They're symptoms of the imperative model: the team owns the layout, the maintenance, the sizing, and the plan tuning. In small pipelines, ownership is fine. At scale, it becomes the bottleneck that the team can't outsource. 2. What "Declarative" Means in the Spark Optimization Context Declarative is a word that gets used in two different ways here, and it's worth pulling them apart. Within Lakeflow pipelines (formerly DLT), it means "describe the tables, not the steps" — the engine builds the DAG and runs it. But in the broader optimization story, declarative also means "describe the desired property of the table or workload, not the operations to maintain it": Layout: I want this table clustered by these columns; figure out when and how to re-cluster.Maintenance: I want this table optimized and vacuumed; figure out the schedule.Ingestion: I want all new files in this path picked up exactly once; figure out checkpointing and listing.Quality: These rows must satisfy these expectations; enforce them and report what gets dropped.Compute: I want this query fast and not wasteful; size and scale appropriately. Each one of those bullets corresponds to a piece of the declarative stack. Used together, they replace a remarkable amount of the boilerplate that has historically lived in Spark pipelines. The mental shift: You stop writing operations against the table and start writing properties of the table. The engine becomes the actor; you become the editor. 3. The Declarative Optimization Stack on Databricks The chart below maps each thing the team declares to the engine capability that handles it, ending at the physical Delta table. It's the picture I draw on whiteboards when teams ask, "What's the order to adopt these in?" Figure 1. The declarative optimization stack: each user-facing intent at the top maps to a continuous engine behavior, which keeps the underlying Delta tables well-clustered, compacted, and statistically up-to-date — without human intervention. Two things are worth highlighting in this picture. First, every box in the engine row is something that runs continuously, not on a cron — there is no daily "optimization window" anymore. Second, the bottom layer is identical to what you'd get from any well-tuned imperative pipeline: 256 MB Parquet files with current statistics. The declarative path doesn't change what good looks like; it changes who does the work to keep things looking good. 4. Layout: Liquid Clustering Replaces Hand-Maintained Z-ORDER Liquid Clustering is the change with the largest practical impact, because partition-key choices are where most lakehouse pipelines accumulate the most technical debt. The declarative version: you specify the columns the data is most often filtered or joined by, and the engine maintains a layout that supports those access patterns — incrementally, as new data arrives, without a full rewrite. When access patterns change, you change the cluster columns, and the engine re-clusters in the background. Defining Liquid-Clustered Tables SQL -- New table, clustered by the columns most commonly filtered on. -- No more PARTITIONED BY, no more guessing at partition cardinality. CREATE TABLE prod.gold.daily_totals ( account_id STRING, region STRING, ingest_date DATE, daily_total DECIMAL(18,2), txn_count BIGINT ) USING DELTA CLUSTER BY (region, ingest_date, account_id); -- Even better: let the engine pick the clustering columns by -- observing real query patterns over time. CREATE TABLE prod.gold.events_clustered USING DELTA CLUSTER BY AUTO AS SELECT * FROM prod.silver.events; Migrating an Existing Partitioned/Z-ORDER Table SQL -- Convert a legacy partitioned table to liquid clustering. -- Existing data files are not rewritten immediately; the engine -- rebalances incrementally on subsequent writes + maintenance. ALTER TABLE prod.silver.transactions CLUSTER BY (account_id, ingest_date); -- Force the first clustering pass for a freshly converted table OPTIMIZE prod.silver.transactions FULL; Why this matters: the recurring 2 AM Slack thread of "can we re-partition this table?" goes away. Layout becomes a property you change with one DDL statement, not a multi-week rewrite project. 5. Maintenance: Predictive Optimization Replaces Cron-Driven OPTIMIZE/VACUUM Predictive optimization is the part that retired the most legacy code in the pipelines I've migrated. Once enabled at the catalog or schema level, the engine monitors each table's read and write patterns and decides on its own when to compact files, re-cluster, vacuum, and refresh statistics. The big win isn't the operations themselves — the imperative pipeline could already run those — it's that the timing is observed-driven, not schedule-driven. Tables that get heavy ingestion get more frequent maintenance. Cold tables get left alone. SQL -- Turn it on at the catalog level once; new tables inherit. ALTER CATALOG prod SET PREDICTIVE OPTIMIZATION = ENABLED; -- Or at the schema level for a phased rollout ALTER SCHEMA prod.gold SET PREDICTIVE OPTIMIZATION = ENABLED; -- Inspect what the engine has been doing on a given table SELECT operation, operation_metrics.numFilesAdded AS files_added, operation_metrics.numFilesRemoved AS files_removed, operation_metrics.numOutputBytes AS output_bytes, timestamp FROM (DESCRIBE HISTORY prod.gold.daily_totals) WHERE userMetadata IS NULL -- engine-driven, not user AND operation IN ('OPTIMIZE', 'VACUUM') AND timestamp >= current_timestamp() - INTERVAL 7 DAYS ORDER BY timestamp DESC; What you should delete after enabling this: the nightly notebook that runs OPTIMIZE on every table in a schema, the VACUUM cron job, the ANALYZE TABLE wrapper, and the alerting that wakes someone up when those jobs run long. None of them are needed anymore, and leaving them on creates duplicate work that the engine and the cron will fight over. 6. Ingestion: Auto Loader Replaces Listing-Based File Detection Auto Loader is the declarative answer to the perennial "which files have we processed already?" problem. Instead of listing a directory, comparing it to a state file, and figuring out the new bits, you describe the source location and the format and let the engine maintain its own incremental state. It uses cloud-native event notifications (S3 events, ADLS notifications, or efficient directory listing as a fallback), and the checkpoint is just another piece of state the engine owns. Python from pyspark.sql.functions import current_timestamp # Streaming ingest from S3 with schema inference + evolution. # Replaces hand-maintained checkpointing, listing logic, and # whatever file-tracking table the team built two years ago. (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaLocation", "s3://acme-checkpoints/txns_schema") .option("cloudFiles.schemaEvolutionMode", "addNewColumns") .load("s3://landing/txns/") .withColumn("_ingest_ts", current_timestamp()) .writeStream .format("delta") .option("checkpointLocation", "s3://acme-checkpoints/txns_writer") .trigger(availableNow=True) # batch-style; runs to completion .toTable("prod.bronze.txns")) Two notes from production. First, schemaEvolutionMode is the option that prevents the silent-data-loss class of bugs when partner schemas change; pick the policy explicitly rather than letting it default. Second, trigger(availableNow=True) gives you batch ergonomics on a streaming source — the job runs until it has consumed everything and exits, which is what most teams actually want for daily ingestion. 7. Transforms and Quality: Declarative Pipelines Replace Bare Spark + External DQ The final piece is the transformation layer. Lakeflow pipelines (the rebrand of Delta Live Tables) let you declare each table as a Python or SQL definition, and add expectations as a first-class concept. The engine derives the DAG from the dependencies and enforces the expectations on every write — the data quality framework, the lineage layer, and the orchestration glue collapse into a single artifact. Python import dlt from pyspark.sql.functions import sum as _sum, col @dlt.table( name="silver_txns", table_properties={ "delta.enableChangeDataFeed": "true", "delta.tuneFileSizesForRewrites": "true", }, cluster_by=["account_id", "ingest_date"], ) @dlt.expect_or_drop("non_null_amount", "amount IS NOT NULL") @dlt.expect_or_fail("valid_currency", "currency IN ('USD','EUR','GBP')") @dlt.expect("unique_txn", "txn_id IS NOT NULL") def silver_txns(): return (dlt.read_stream("bronze_txns") .dropDuplicates(["txn_id"])) @dlt.table(name="gold_daily_totals") def gold_daily_totals(): return (dlt.read("silver_txns") .groupBy("ingest_date", "account_id", "region") .agg(_sum("amount").alias("daily_total"))) The decorators do four things at once: define the table, declare its layout (cluster_by), declare its quality rules, and let the engine infer that gold_daily_totals depends on silver_txns from the dlt.read call. There is no DAG file. There is no separate Great Expectations suite. Lineage is generated for free in Unity Catalog, including column-level edges. If you want to query how the expectations have been performing — useful for SLO dashboards or alerting — the event log surfaces it directly: SQL -- Pass / fail / drop counts per expectation, last 24 hours SELECT flow_name, details:flow_progress.data_quality.expectations[0].name AS exp_name, details:flow_progress.data_quality.expectations[0].passed_records AS passed, details:flow_progress.data_quality.expectations[0].failed_records AS failed, details:flow_progress.data_quality.expectations[0].dropped_records AS dropped, timestamp FROM event_log("<pipeline-id>") WHERE event_type = 'flow_progress' AND timestamp >= current_timestamp() - INTERVAL 1 DAY ORDER BY timestamp DESC; 8. Putting It Together: Where to Start, What to Measure Adopting all of this at once is a recipe for pain. The order I've seen work, and a small set of metrics to verify the change is paying off: Step Adopt Retire Verify with 1 Predictive optimization at schema level Nightly OPTIMIZE / VACUUM jobs Reduction in maintenance-cluster cost 2 Liquid clustering on top 5 tables Static partitioning + Z-ORDER p95 query latency on the same workloads 3 Auto loader for 1-2 ingestion pipelines Custom file-tracking + listing logic End-to-end data freshness 4 Lakeflow pipelines for new pipelines only External DQ + DAG glue (for new work) Lines of pipeline code per table 5 Serverless compute for SQL warehouses + DLT Hand-sized job clusters Cost-per-query, scale-up time What you do not need to migrate: imperative pipelines that already work and aren't growing. Declarative patterns are about new work and high-pain hot spots, not a heroic rewrite of every notebook ever shipped. 9. Honest Limitations and Where Imperative Still Wins Three places where the declarative model still bites — worth knowing before you commit: Procedural logic still belongs in Jobs. If your pipeline is really a sequence of API calls with branching error handling, that's a Lakeflow Job (or external code), not a declarative table. Don't try to bend dlt around it.Predictive optimization needs observation time. On a table that's a week old, the engine hasn't seen enough patterns to make great decisions. For tables under heavy initial load, an explicit OPTIMIZE FULL after the first big ingest still helps.Cluster-by-column choice still matters. CLUSTER BY AUTO is great for stable workloads with predictable filters. For tables whose access pattern is genuinely heterogeneous across teams, an explicit cluster-by based on the dominant query is usually faster.Hint-driven escapes are still allowed. If a particular query benefits from a /*+ BROADCAST(t) */ hint and AQE isn't catching it, the hint is fine. Just keep them rare and document why. Conclusion The declarative optimization story isn't a single feature you toggle — it's a quiet shift in who owns the boring parts of a Spark pipeline. Layout, maintenance, ingestion bookkeeping, plan tuning, cluster sizing, data quality enforcement: every one of those was traditionally a thing the team owned and paid for in toil. The current Databricks stack lets you express each as an intent and let the engine handle the operations underneath. Adopt them in order, retire what they replace, and the optimization treadmill slows from a daily concern to a quarterly review. That's the actual win, and it's the reason the declarative paradigm has gone from a Lakeflow detail to the default mental model for new pipelines on Databricks.

By Seshendranath Balla Venkata
Event-Driven Pipelines With Apache Pulsar and Go
Event-Driven Pipelines With Apache Pulsar and Go

A Practical Walkthrough Most distributed systems eventually hit a wall with their messaging layer, whether it's Kafka's tight coupling between compute and storage, RabbitMQ's limited replay capabilities, or the operational overhead of managing multiple tools for queuing and streaming. Apache Pulsar was engineered to address these gaps from the ground up. In this article, we'll dissect a working Go-based demo that wires together a Pulsar producer, consumer, and Prometheus monitoring layer into a cohesive, observable messaging pipeline. The full source is on GitHub. Why Pulsar Deserves a Closer Look Pulsar's architecture makes a deliberate trade-off that most messaging systems avoid: it physically separates the broker tier (which handles routing, subscriptions, and protocol) from the storage tier (Apache BookKeeper, which handles persistence). This isn't just an implementation detail. It means you can independently autoscale message routing capacity without touching your storage cluster, and vice versa. Beyond the architecture, a few capabilities stand out for engineering teams: Multi-tenancy at the protocol level – tenants, namespaces, and topics form a three-level hierarchy, making it practical to run a single Pulsar cluster for multiple teams or services without namespace collisions.Four distinct subscription semantics – Exclusive, Shared, Failover, and Key_Shared give you precise control over how messages are distributed across consumer instances, something Kafka's consumer group model doesn't natively offer.Cursor-based message retention – Pulsar retains messages based on subscription cursors, not time-based log compaction. A consumer that falls behind doesn't lose messages; it simply catches up from its last acknowledged position.Native schema enforcement – The built-in schema registry validates message payloads at the broker level before they reach consumers, catching contract violations at the boundary rather than deep inside application logic. What the Demo Builds The project is structured as three independent Go binaries, each with a single responsibility: reStructuredText ├── producer/ │ ├── main.go # HTTP server → Pulsar publisher │ ├── go.mod │ └── go.sum ├── consumer/ │ └── main.go # Pulsar subscriber → message processor ├── monitor/ │ └── main.go # Pulsar producer + Prometheus metrics server ├── prometheus.yml # Scrape configuration └── README.md All three use `github.com/apache/pulsar-client-go/pulsar` — the official, Apache-maintained Go client. The client is not a thin wrapper; it implements the full Pulsar binary protocol, connection pooling, producer batching, and automatic Prometheus metric registration. Component 1: The Producer The producer exposes a single HTTP endpoint. An incoming HTTP request triggers a Pulsar publish operation, decoupling the caller from any direct knowledge of the messaging infrastructure. Go package main import ( "context" "fmt" "log" "net/http" "github.com/apache/pulsar-client-go/pulsar" ) var client pulsar.Client func main() { var err error client, err = pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { log.Fatal(err) } defer client.Close() http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) { msg := r.URL.Query().Get("msg") err := publishMessage(msg) if err != nil { w.Write([]byte("msg failed to published")) } else { w.Write([]byte("msg successfully published")) } }) if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatal(err) } } func publishMessage(msg string) error { producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "my-topic", }) if err != nil { log.Fatal(err) } _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte("Hello"), }) return err } A few architectural observations worth unpacking: The HTTP-to-Pulsar bridge pattern is deliberately pragmatic. Rather than requiring every upstream service to embed a Pulsar client, you expose a thin HTTP adapter. This is particularly valuable when integrating with systems that speak HTTP natively — CI/CD pipelines, third-party webhooks, or legacy services that can't easily adopt a new client library. `pulsar.NewClient` establishes a connection pool, not a single TCP connection. The client maintains persistent connections to the broker and handles reconnection, load balancing across broker nodes, and TLS negotiation transparently. Calling `client.Close()` via `defer` ensures all in-flight messages are flushed before the process exits. `producer.Send` with `context.Background()` submits the message to the producer's internal send queue. The Pulsar client batches outgoing messages by default (configurable via `BatchingMaxMessages` and `BatchingMaxPublishDelay`), which significantly improves throughput under load without any changes to application code. For production use, the producer instance should be created once at startup and reused across requests. Creating a new producer per request incurs connection overhead and bypasses the batching optimization entirely. Component 2: The Consumer The consumer subscribes to a topic and processes messages with explicit acknowledgment. The subscription type chosen here — `pulsar.Shared` — has meaningful implications for how the system scales. Go consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message with Id: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) } Subscription Semantics in Depth Pulsar's subscription model is one of its most differentiating features. Here's how the four types behave at the broker level: Exclusive – The broker enforces that only one consumer holds the subscription at any time. A second consumer attempting to subscribe with the same name will receive an error. This guarantees strict message ordering but eliminates horizontal scaling.Shared – The broker distributes messages across all active consumers in round-robin order. Any number of consumers can join or leave the subscription dynamically. This is the right choice for stateless workloads where processing order doesn't matter and throughput is the priority.Failover – The broker designates one consumer as the active receiver. Others remain connected but idle, ready to take over if the active consumer disconnects. This preserves ordering while providing high availability — a pattern common in financial transaction processing.Key_Shared – The broker routes messages with the same key consistently to the same consumer instance. This enables stateful processing (e.g., per-user session aggregation) without external coordination, as long as the consumer count remains stable. Why Explicit Acknowledgment Matters `consumer.Ack(msg)` signals to the broker that the message has been durably processed and can be removed from the subscription's cursor. If the consumer process crashes between `Receive` and `Ack`, the broker will redeliver the message to another consumer in the subscription. This is the mechanism behind at-least-once delivery. For workloads that require exactly-once semantics, Pulsar supports transactional acknowledgment, where the `Ack` and any downstream writes are committed atomically. That's a more advanced topic, but the foundation is the same `Ack` call shown here. Component 3: The Monitor The monitor is architecturally the most interesting component. It runs two HTTP servers concurrently — one for the application endpoint, one for Prometheus metrics — and uses a Pulsar producer to generate observable traffic. Go package main import ( "context" "fmt" "log" "net/http" "strconv" "github.com/apache/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6605", }) if err != nil { log.Fatal(err) } defer client.Close() prometheusPort := 2112 go func() { if err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil); err != nil { log.Fatal(err) } }() producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1", }) if err != nil { log.Fatal(err) } defer producer.Close() ctx := context.Background() webPort := 8082 http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) { msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-world")), }) if err != nil { log.Fatal(err) } else { log.Printf("Published message: %v", msgId) fmt.Fprintf(w, "Message Published: %v", msgId) } }) if err := http.ListenAndServe(":"+strconv.Itoa(webPort), nil); err != nil { log.Fatal(err) } } How the Pulsar Client Registers Prometheus Metrics When `pulsar.NewClient` is called, the Go client automatically registers a set of Prometheus collectors with the default `prometheus.DefaultRegisterer`. No additional instrumentation code is required. The metrics are served at `/metrics` on whatever port you bind `http.DefaultServeMux` to port — in this case, port `2112`. The metrics exposed include: `pulsar_client_producers_opened` / `pulsar_client_producers_closed` – producer lifecycle counters`pulsar_client_consumers_opened` / `pulsar_client_consumers_closed` – consumer lifecycle counters`pulsar_client_messages_published_total` – cumulative publish count per topic`pulsar_client_publish_latency_seconds` – histogram of end-to-end publish latency`pulsar_client_bytes_published_total` – total bytes written to the broker Running the Prometheus metrics server in a goroutine while the main goroutine handles the application HTTP server is idiomatic Go concurrency. Both servers share the same `http.DefaultServeMux`, which is why the Prometheus `/metrics` handler (registered automatically by the client library) is accessible on the metrics port without any explicit route registration. Prometheus Scrape Configuration YAML scrape_configs: - job_name: pulsar-client-go-metrics scrape_interval: 10s static_configs: - targets: - localhost:2112 The `scrape_interval: 10s` is a reasonable starting point for development. In production, you would typically align this with your alerting resolution requirements — a 30-second interval is common for dashboards, while 10 seconds or less is appropriate for latency-sensitive alerting rules. With these metrics flowing into Prometheus, you can build Grafana panels that surface producer throughput, consumer lag, and publish latency percentiles with three signals that matter most when diagnosing messaging pipeline issues. Running the Full Pipeline Prerequisites Apache Pulsar standalone Go 1.18+Prometheus (optional) Startup Sequence 1. Launch Pulsar in standalone mode: Shell bin/pulsar standalone 2. Start the producer service: Shell cd producer && go run main.go 3. Start the consumer service: Shell cd consumer && go run main.go 4. Start the monitor service: Shell cd monitor && go run main.go 5. Start Prometheus: Shell prometheus --config.file=prometheus.yml 6. Publish a message via the producer endpoint: Shell curl "http://localhost:8080/publish?msg=test_pulsar_message_publish_event" # msg successfully published 7. Trigger the monitor producer: Shell curl http://localhost:8082/produce # Message Published: (messageId) 8. Inspect raw Prometheus metrics: Shell curl http://localhost:2112/metrics | grep pulsar Engineering Takeaways Decouple publish triggers from client library dependencies. The HTTP-to-Pulsar adapter pattern used in the producer is not just a demo convenience. It is a legitimate architectural boundary. Services that need to emit events don't need to know anything about Pulsar's protocol, topic naming, or client configuration. They make an HTTP call; the adapter handles the rest.Match subscription type to processing semantics, not just throughput. A common mistake is defaulting to `Shared` for everything because it scales horizontally. If your processing logic is stateful, for example, aggregating events per user session. `Key_Shared` gives you the same horizontal scalability while preserving per-key ordering without any application-level coordination.Treat the acknowledgment boundary as your consistency boundary. Everything between `Receive` and `Ack` is your processing window. Any side effects (database writes, downstream API calls, cache updates) that happen in this window must be idempotent, because Pulsar will redeliver the message if the consumer fails before acknowledging. Design your processing logic around this constraint from the start, not as an afterthought.Zero-cost observability is a genuine advantage. The fact that `pulsar-client-go` registers Prometheus metrics automatically means you get throughput, latency, and connection health data from the moment your application starts, without writing a single line of instrumentation code. This is a meaningful operational advantage over client libraries that require manual metric registration. Extending the Demo The current implementation is intentionally minimal. Here are technically meaningful extensions worth exploring: Schema enforcement – Replace raw `[]byte` payloads with Pulsar's schema-aware producer/consumer API. Using `pulsar.NewAvroSchema` or `pulsar.NewJSONSchema` moves payload validation to the broker, preventing malformed messages from ever reaching consumers.Dead-letter topic routing – Configure `DeadLetterPolicy` on the consumer to automatically route messages that exceed a maximum redelivery count to a separate topic. This prevents poison-pill messages from blocking the subscription indefinitely.Producer batching tuning – Set `BatchingMaxMessages`, `BatchingMaxSize`, and `BatchingMaxPublishDelay` on `ProducerOptions` to optimize the throughput/latency trade-off for your specific workload profile.Graceful shutdown – Add `os/signal` handling to flush in-flight messages and close the producer cleanly before the process exits. The current `defer client.Close()` handles the happy path but won't fire on `SIGKILL`.Kubernetes-native deployment – Package each component as a container and deploy using the official Pulsar Helm chart. The producer and monitor can be exposed as Kubernetes Services; the consumer can run as a Deployment with HPA scaling based on the `pulsar_client_messages_published_total` metric exported to a custom metrics adapter. Conclusion The `apache-pulsar` project demonstrates that building a production-grade messaging pipeline with Apache Pulsar and Go doesn't require much code. It requires understanding the right abstractions. The producer-consumer-monitor triad covers the three concerns that matter in any event-driven system: getting data in, getting data out, and knowing what's happening in between. Pulsar's architecture, decoupled storage, flexible subscription semantics, and built-in observability make it a strong candidate for teams that have outgrown simpler messaging systems and need more precise control over delivery guarantees, scaling behavior, and operational visibility. Source Code https://github.com/shivik/apache-pulsar-demo

By Shivi Kashyap
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka

After implementing contract-first integration across three different microservices architectures, I've learned that the biggest bottleneck in distributed systems isn't technical; it's coordination between teams. When Team A waits for Team B to finish their API before starting integration work, you're throwing away weeks of productivity. Contract-first development flips this model. By defining your integration contracts upfront (OpenAPI specs, Avro schemas, database migrations), you enable teams to work in parallel, catch breaking changes early through CI validation, and treat contracts as the single source of truth. This isn't theoretical; this is how Netflix, Uber, and Amazon scale their engineering organizations. In this article, I'll show you production-ready contract-first patterns using Java 21, Spring Boot 3, OpenAPI 3, Apache Kafka with Avro, and Flyway migrations. You'll see real code from a working system that handles the three critical integration boundaries: REST APIs, event-driven messaging, and database schemas. The Problem: Why Traditional Integration Fails at Scale When systems integrate without contracts, you hit three major problems: 1. Serial development bottlenecks: Team A builds an API endpoint. Team B waits. Team B builds a consumer. Team A discovers the payload doesn't match what Team B expected. Both teams spend days debugging mismatched assumptions. 2. Late discovery of breaking changes: You deploy a service update that changes a response field from customerId to customer_id. Your API consumers break in production. No tests caught it because there was no contract to validate against. 3. Documentation drift: The Swagger docs say the endpoint returns a 201. The actual code returns a 200. The integration tests expect a 404. Nobody knows which one is right because there's no single source of truth. Contract-first development solves all three by making the contract the authoritative specification that generates code, mocks, tests, and documentation. What Contract-First Actually Means Contract-first means you define the integration boundary first (the contract) and then write code that conforms to it. The contract is not an afterthought or generated documentation. It's the design specification. A complete contract includes: Operations: Endpoints (REST), topics (Kafka), or tables (database)Data shapes: Request/response DTOs, event schemas, column definitionsValidation rules: Required fields, constraints, data typesError model: HTTP status codes, error payloads, dead-letter queuesNon-functional rules: Idempotency, retries, compatibility policies, SLAs Here's the mental model: Agree on the contract → generate tools → build independently → let CI enforce alignment. Contract Type 1: REST API Contracts With OpenAPI OpenAPI 3 specs are the gold standard for REST API contracts. You define endpoints, request/response schemas, validation rules, and error responses in YAML. Then you generate server stubs, client SDKs, mocks, and documentation from that single source. OpenAPI Contract Example Here's a production OpenAPI contract for an order management API: YAML openapi: 3.2.0 info: title: Orders API version: 1.0.0 description: Contract-first REST API for order management paths: /v1/orders: post: operationId: createOrder summary: Create a new order requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/CreateOrderRequest' responses: '201': description: Order created successfully content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '400': description: Validation error content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' '409': description: Idempotency conflict content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' /v1/orders/{orderId}: get: operationId: getOrder parameters: - name: orderId in: path required: true schema: type: string responses: '200': description: Order found content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '404': description: Order not found components: schemas: CreateOrderRequest: type: object required: [customerId, items] properties: customerId: type: string example: CUST-123 idempotencyKey: type: string description: Optional key for safe retries items: type: array minItems: 1 items: $ref: '#/components/schemas/OrderItem' OrderItem: type: object required: [sku, quantity] properties: sku: type: string example: SKU-001 quantity: type: integer minimum: 1 OrderResponse: type: object required: [orderId, customerId, status, items, timestamp] properties: orderId: type: string customerId: type: string status: type: string enum: [CREATED, REJECTED] items: type: array items: $ref: '#/components/schemas/OrderItem' timestamp: type: string format: date-time ErrorResponse: type: object required: [code, message, traceId, timestamp] properties: code: type: string enum: [VALIDATION_ERROR, NOT_FOUND, CONFLICT, INTERNAL_ERROR] message: type: string traceId: type: string timestamp: type: string format: date-time Implementing the Provider Side (Spring Boot) The contract drives the implementation. Your Spring Boot controller implements what the contract specifies: Java @RestController @RequestMapping("/v1/orders") @RequiredArgsConstructor @Slf4j public class OrderController { private final OrderService orderService; /** * POST /v1/orders * Contract: contracts/openapi/orders-api.v1.yaml */ @PostMapping public ResponseEntity<OrderResponse> createOrder( @Valid @RequestBody CreateOrderRequest request) { log.info("Creating order for customer: {}", request.customerId()); OrderResponse response = orderService.createOrder(request); return ResponseEntity .status(HttpStatus.CREATED) .body(response); } /** * GET /v1/orders/{orderId} */ @GetMapping("/{orderId}") public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) { OrderResponse response = orderService.getOrder(orderId) .orElseThrow(() -> new ResourceNotFoundException("Order not found: " + orderId)); return ResponseEntity.ok(response); } } Critical implementation details: DTOs match contract schemas exactly: CreateOrderRequest and OrderResponse are Java records generated from or validated against the OpenAPI specHTTP status codes match contract: 201 for creation, 404 for not found, 409 for idempotency conflictsValidation is enforced: @Valid annotation ensures request validation matches OpenAPI constraintsError responses are standardized: All errors return ErrorResponse with consistent structure Consumer Parallel Development Here's where contract-first shines. While your team implements the provider, the consumer team can: Generate a Java client from the OpenAPI spec using openapi-generatorRun a mock server that returns valid responses based on the contractWrite integration tests against the mockSwitch to the real service when it's ready (no code changes needed) The consumer doesn't wait for you to finish. They develop in parallel. Contract Type 2: Event Contracts With Kafka and Avro Event-driven systems need two contract layers: topic semantics (human-readable) and schema definitions (machine-validated). Topic Semantics Contract Document the operational contract for each topic: Markdown ## Topic: orders.order-created.v1 - **Purpose**: Emitted when an order is created successfully - **Key**: orderId (partition affinity per order) - **Delivery**: At-least-once (consumers must be idempotent) - **Consumer requirement**: Deduplicate by eventId - **Retry policy**: Consumer retries transient errors - **DLQ**: orders.order-created.v1.dlq for poison messages - **Compatibility**: Backward compatible schema evolution required Avro Schema Contract The Avro schema is your machine-validated contract: Java { "type": "record", "name": "OrderCreated", "namespace": "com.acme.events", "doc": "Event emitted when an order is successfully created", "fields": [ { "name": "eventId", "type": "string", "doc": "Unique event ID for idempotent processing" }, { "name": "occurredAt", "type": "string", "doc": "ISO 8601 timestamp" }, { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "source", "type": ["null", "string"], "default": null, "doc": "Order source (WEB, MOBILE, API). Nullable for backward compatibility." }, { "name": "items", "type": { "type": "array", "items": { "type": "record", "name": "OrderItem", "fields": [ {"name": "sku", "type": "string"}, {"name": "quantity", "type": "int"} ] } } } ] } Key pattern: The source field is nullable with a default value. This supports backward-compatible evolution; old consumers can read new events, and new consumers can read old events. Kafka Producer Implementation Java @Component @RequiredArgsConstructor @Slf4j public class OrderEventPublisher { private final KafkaTemplate<String, Object> kafkaTemplate; public void publishOrderCreated(OrderCreated event) { String key = event.getOrderId(); log.debug("Publishing OrderCreated: orderId={}, eventId={}", key, event.getEventId()); CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("orders.order-created.v1", key, event); future.whenComplete((result, ex) -> { if (ex == null) { log.info("Published OrderCreated: orderId={}, partition={}, offset={}", key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { log.error("Failed to publish OrderCreated: orderId={}", key, ex); } }); } } Production concerns addressed: Key-based partitioning: Using orderId as the key ensures all events for the same order go to the same partition, maintaining orderingAsync publishing with callbacks: Non-blocking publish with explicit success/failure handlingStructured logging: Captures partition and offset for troubleshooting Kafka Consumer With Idempotency At-least-once delivery means duplicates are possible. Consumers must deduplicate: Java @KafkaListener(topics = "orders.order-created.v1", groupId = "billing-service") public void onOrderCreated(OrderCreated event) { // Check if already processed if (processedEventsRepository.existsByEventId(event.getEventId())) { log.debug("Skipping duplicate event: {}", event.getEventId()); return; } // Process event billingService.createInvoice( event.getOrderId(), event.getCustomerId(), event.getItems() ); // Mark as processed processedEventsRepository.save( new ProcessedEvent(event.getEventId(), Instant.now()) ); } Idempotency pattern: Check eventId before processing, store it after processing. If the same event arrives twice, the second one is ignored. Architecture Diagram: Contract-First Flow After implementing contract-first integration across three different microservices architectures, I've learned that the biggest bottleneck in distributed systems isn't technical; it's coordination between teams. When Team A waits for Team B to finish their API before starting integration work, you're throwing away weeks of productivity. Contract-first development flips this model. By defining your integration contracts upfront (OpenAPI specs, Avro schemas, database migrations), you enable teams to work in parallel, catch breaking changes early through CI validation, and treat contracts as the single source of truth. This isn't theoretical; this is how Netflix, Uber, and Amazon scale their engineering organizations. In this article, I'll show you production-ready contract-first patterns using Java 21, Spring Boot 3, OpenAPI 3, Apache Kafka with Avro, and Flyway migrations. You'll see real code from a working system that handles the three critical integration boundaries: REST APIs, event-driven messaging, and database schemas. The Problem: Why Traditional Integration Fails at Scale When systems integrate without contracts, you hit three major problems: 1. Serial development bottlenecks: Team A builds an API endpoint. Team B waits. Team B builds a consumer. Team A discovers the payload doesn't match what Team B expected. Both teams spend days debugging mismatched assumptions. 2. Late discovery of breaking changes: You deploy a service update that changes a response field from customerId to customer_id. Your API consumers break in production. No tests caught it because there was no contract to validate against. 3. Documentation drift: The Swagger docs say the endpoint returns a 201. The actual code returns a 200. The integration tests expect a 404. Nobody knows which one is right because there's no single source of truth. Contract-first development solves all three by making the contract the authoritative specification that generates code, mocks, tests, and documentation. What Contract-First Actually Means Contract-first means you define the integration boundary first (the contract) and then write code that conforms to it. The contract is not an afterthought or generated documentation. It's the design specification. A complete contract includes: Operations: Endpoints (REST), topics (Kafka), or tables (database)Data shapes: Request/response DTOs, event schemas, column definitionsValidation rules: Required fields, constraints, data typesError model: HTTP status codes, error payloads, dead-letter queuesNon-functional rules: Idempotency, retries, compatibility policies, SLAs Here's the mental model: Agree on the contract → generate tools → build independently → let CI enforce alignment. Contract Type 1: REST API Contracts With OpenAPI OpenAPI 3 specs are the gold standard for REST API contracts. You define endpoints, request/response schemas, validation rules, and error responses in YAML. Then you generate server stubs, client SDKs, mocks, and documentation from that single source. OpenAPI Contract Example Here's a production OpenAPI contract for an order management API: YAML openapi: 3.2.0 info: title: Orders API version: 1.0.0 description: Contract-first REST API for order management paths: /v1/orders: post: operationId: createOrder summary: Create a new order requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/CreateOrderRequest' responses: '201': description: Order created successfully content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '400': description: Validation error content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' '409': description: Idempotency conflict content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' /v1/orders/{orderId}: get: operationId: getOrder parameters: - name: orderId in: path required: true schema: type: string responses: '200': description: Order found content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '404': description: Order not found components: schemas: CreateOrderRequest: type: object required: [customerId, items] properties: customerId: type: string example: CUST-123 idempotencyKey: type: string description: Optional key for safe retries items: type: array minItems: 1 items: $ref: '#/components/schemas/OrderItem' OrderItem: type: object required: [sku, quantity] properties: sku: type: string example: SKU-001 quantity: type: integer minimum: 1 OrderResponse: type: object required: [orderId, customerId, status, items, timestamp] properties: orderId: type: string customerId: type: string status: type: string enum: [CREATED, REJECTED] items: type: array items: $ref: '#/components/schemas/OrderItem' timestamp: type: string format: date-time ErrorResponse: type: object required: [code, message, traceId, timestamp] properties: code: type: string enum: [VALIDATION_ERROR, NOT_FOUND, CONFLICT, INTERNAL_ERROR] message: type: string traceId: type: string timestamp: type: string format: date-time Implementing the Provider Side (Spring Boot) The contract drives the implementation. Your Spring Boot controller implements what the contract specifies: Java @RestController @RequestMapping("/v1/orders") @RequiredArgsConstructor @Slf4j public class OrderController { private final OrderService orderService; /** * POST /v1/orders * Contract: contracts/openapi/orders-api.v1.yaml */ @PostMapping public ResponseEntity<OrderResponse> createOrder( @Valid @RequestBody CreateOrderRequest request) { log.info("Creating order for customer: {}", request.customerId()); OrderResponse response = orderService.createOrder(request); return ResponseEntity .status(HttpStatus.CREATED) .body(response); } /** * GET /v1/orders/{orderId} */ @GetMapping("/{orderId}") public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) { OrderResponse response = orderService.getOrder(orderId) .orElseThrow(() -> new ResourceNotFoundException("Order not found: " + orderId)); return ResponseEntity.ok(response); } } Critical implementation details: DTOs match contract schemas exactly: CreateOrderRequest and OrderResponse are Java records generated from or validated against the OpenAPI specHTTP status codes match contract: 201 for creation, 404 for not found, 409 for idempotency conflictsValidation is enforced: @Valid annotation ensures request validation matches OpenAPI constraintsError responses are standardized: All errors return ErrorResponse with consistent structure Consumer Parallel Development Here's where contract-first shines. While your team implements the provider, the consumer team can: Generate a Java client from the OpenAPI spec using openapi-generatorRun a mock server that returns valid responses based on the contractWrite integration tests against the mockSwitch to the real service when it's ready (no code changes needed) The consumer doesn't wait for you to finish. They develop in parallel. Contract Type 2: Event Contracts With Kafka and Avro Event-driven systems need two contract layers: topic semantics (human-readable) and schema definitions (machine-validated). Topic Semantics Contract Document the operational contract for each topic: Markdown ## Topic: orders.order-created.v1 - **Purpose**: Emitted when an order is created successfully - **Key**: orderId (partition affinity per order) - **Delivery**: At-least-once (consumers must be idempotent) - **Consumer requirement**: Deduplicate by eventId - **Retry policy**: Consumer retries transient errors - **DLQ**: orders.order-created.v1.dlq for poison messages - **Compatibility**: Backward compatible schema evolution required Avro Schema Contract The Avro schema is your machine-validated contract: JSON { "type": "record", "name": "OrderCreated", "namespace": "com.acme.events", "doc": "Event emitted when an order is successfully created", "fields": [ { "name": "eventId", "type": "string", "doc": "Unique event ID for idempotent processing" }, { "name": "occurredAt", "type": "string", "doc": "ISO 8601 timestamp" }, { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "source", "type": ["null", "string"], "default": null, "doc": "Order source (WEB, MOBILE, API). Nullable for backward compatibility." }, { "name": "items", "type": { "type": "array", "items": { "type": "record", "name": "OrderItem", "fields": [ {"name": "sku", "type": "string"}, {"name": "quantity", "type": "int"} ] } } } ] } Key pattern: The source field is nullable with a default value. This supports backward-compatible evolution; old consumers can read new events, and new consumers can read old events. Kafka Producer Implementation Java @Component @RequiredArgsConstructor @Slf4j public class OrderEventPublisher { private final KafkaTemplate<String, Object> kafkaTemplate; public void publishOrderCreated(OrderCreated event) { String key = event.getOrderId(); log.debug("Publishing OrderCreated: orderId={}, eventId={}", key, event.getEventId()); CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("orders.order-created.v1", key, event); future.whenComplete((result, ex) -> { if (ex == null) { log.info("Published OrderCreated: orderId={}, partition={}, offset={}", key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { log.error("Failed to publish OrderCreated: orderId={}", key, ex); } }); } } Architecture Diagram: Contract-First Flow Figure 1: Contract-first development flow showing parallel team development Note: Solid arrows represent generation or implementation flow. Dashed arrows represent validation relationships. Contract Type 3: Database Contracts With Flyway Contract Type 3: Database Contracts With Flyway Database schemas are contracts, too. Flyway migrations let you version and evolve schemas with the same contract-first approach. Base Schema Migration File: V1__create_orders.sql SQL CREATE TABLE orders ( id VARCHAR(32) PRIMARY KEY, customer_id VARCHAR(32) NOT NULL, status VARCHAR(16) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT now() ); CREATE TABLE order_items ( order_id VARCHAR(32) NOT NULL REFERENCES orders(id), sku VARCHAR(64) NOT NULL, quantity INT NOT NULL CHECK (quantity > 0), PRIMARY KEY (order_id, sku) ); CREATE INDEX idx_orders_customer_id ON orders(customer_id); Schema Evolution: Expand/Migrate/Contract When you need to add a field without breaking old code, use the expand/migrate/contract pattern: Expand (V2__add_order_source.sql): SQL ALTER TABLE orders ADD COLUMN source VARCHAR(32); Migrate (application code): SQL -- Backfill for existing rows UPDATE orders SET source = 'UNKNOWN' WHERE source IS NULL; Contract (V3__enforce_source_not_null.sql): SQL -- After all systems produce source ALTER TABLE orders ALTER COLUMN source SET NOT NULL; This three-phase approach lets you deploy schema changes without downtime. CI/CD Enforcement: Making Contracts Real Contracts only work if they're enforced. Here are the CI gates that prevent breaking changes: REST API: OpenAPI Breaking Change Detection Run openapi-diff in CI to catch breaking changes: YAML # .github/workflows/api-contract-check.yml - name: Check for API breaking changes run: | npx openapi-diff \ main:contracts/openapi/orders-api.v1.yaml \ HEAD:contracts/openapi/orders-api.v1.yaml \ --fail-on-breaking This fails the build if you: Remove required fieldsChange field typesRemove endpointsChange HTTP status codes Kafka: Schema Registry Compatibility Check Configure Schema Registry to enforce backward compatibility: YAML # Schema Registry configuration confluent.schema.registry.url=http://schema-registry:8081 spring.kafka.properties.schema.registry.url=http://schema-registry:8081 # Enforce backward compatibility spring.kafka.producer.properties.auto.register.schemas=true spring.kafka.producer.properties.use.latest.version=true When you try to publish an incompatible schema, the producer fails at startup, before reaching production. Database: Flyway Validation Flyway validates migrations on startup: Properties files spring.flyway.validate-on-migrate=true spring.flyway.baseline-on-migrate=false If someone manually modified the database schema, Flyway detects the mismatch and fails the deployment. Integration Flow Diagram Figure 2: End-to-end flow showing REST → DB → Kafka integration with contracts enforced at each boundary Real-World Benefits: Production Implementation Experience After implementing contract-first across multiple services (orders, billing, inventory), teams consistently observe significant improvements: Integration quality: Substantial reduction in integration bugs reaching productionMost remaining bugs are business logic issues, not contract mismatchesClear contracts eliminate ambiguity in integration expectations Development velocity: Integration cycles measured in weeks rather than monthsConsumer teams start integration work immediately instead of waiting for provider completionMock servers enable realistic integration testing without coordination delays Operational reliability: CI-enforced contract validation prevents breaking changes from reaching productionBreaking changes caught during PR review, not after deploymentAutomated validation ensures compatibility before merge Documentation accuracy: Documentation generated from OpenAPI specs stays synchronized with implementation by designSwagger UI always reflects actual API behavior as both derive from the same contractNo manual documentation maintenance required Common Pitfalls and How to Avoid Them Pitfall 1: Treating Contracts as Documentation Wrong approach: Write code first, generate OpenAPI from annotations later.Right approach: Write OpenAPI first, generate server stubs, or validate implementation against it. Pitfall 2: Skipping CI Validation Wrong approach: Trust developers to manually check compatibility.Right approach: Automate breaking change detection in CI. Make it a required check. Pitfall 3: No Schema Evolution Strategy Wrong approach: Add fields without defaults, breaking old consumers.Right approach: All new fields must be optional or have defaults. Test with multiple schema versions. Pitfall 4: Ignoring Idempotency Wrong approach: Assume exactly-once delivery in Kafka.Right approach: Design consumers to deduplicate by eventId. Assume at-least-once. Pitfall 5: Coupling Contracts to Implementation Wrong approach: Expose internal database schema directly in API contracts.Right approach: Design contracts for consumers, not internal convenience. Use DTOs that map between the external contract and the internal model. When Contract-First Makes Sense Contract-first isn't always the answer. Use it when: ✅ Multiple teams integrate: The coordination cost justifies the upfront contract design effort✅ Public APIs or partner integrations: External consumers need stability and clear documentation✅ Microservices architecture: Services must evolve independently without breaking dependents✅ High change frequency: CI validation catches breaking changes early when changes are frequent Skip contract-first when: ❌ Single small team: Coordination overhead is low, so formal contracts add friction❌ Prototyping: You're exploring the problem space and expect major pivots❌ Internal tools with one consumer: The provider and consumer are maintained by the same person Key Takeaways Contracts enable parallel development: Provider and consumer teams work simultaneously instead of seriallyCI validation prevents breaking changes: Automate OpenAPI diffs, schema compatibility checks, and migration validationIdempotency is not optional: At-least-once delivery in Kafka requires consumers to deduplicate by event IDSchema evolution requires backward compatibility: Use nullable fields with defaults to support gradual rolloutsContracts are design specifications, not afterthoughts: Define contracts first, generate code from them What's Next? If you're implementing contract-first integration: Start with one integration, pick your most painful cross-team dependencyWrite the OpenAPI spec or Avro schema first, before any implementationSet up CI validation for that contract (openapi-diff or Schema Registry)Measure reduction in integration bugs and coordination overheadExpand to other integrations once you've proven the pattern Contract-first isn't a silver bullet, but it transforms integration from a coordination nightmare into a governance problem that CI can solve. When you're coordinating three teams across two time zones, that shift makes the difference between shipping in weeks versus months. Full source code: github.com/wallaceespindola/contract-first-integrations Related reading: OpenAPI 3.0 Specification: spec.openapis.orgConfluent Schema Registry: docs.confluent.io/platform/current/schema-registryFlyway Documentation: flywaydb.org/documentationMastering Contract-First API Development: moesif.com/blogResearch on Microservices Issues: arxiv.org Need more tech insights? Check out my GitHub, LinkedIn, and Speaker Deck. Happy coding!

By Wallace Espindola

Top Big Data Experts

expert thumbnail

Miguel Garcia

VP of Engineering,
Factorial

Miguel has a great background in leading teams and building high-performance solutions for the retail sector. An advocate of platform design as a service and data as a product.
expert thumbnail

Gautam Goswami

Founder,
DataView

Enthusiastic about learning and sharing knowledge on Big Data, Data Science & related headways including data streaming platforms through knowledge sharing platform Dataview.in. Presently serving as Head of Engineering & Data Streaming at Irisidea TechSolutions, Bangalore, India. https://www.irisidea.com/gautam-goswami/
expert thumbnail

Ram Ghadiyaram

Vice President - Fintech/ Cloud /Bigdata / Analytics / AI & ML,
JPMorgan Chase & Co.

Fintech | Cloud | Big Data Analytics | AI & ML Expert . Venkata Ram Anjaneya Prasad Gadiyaram(aka Ram Ghadiyaram) is a seasoned Cloud Big Data analytics, AI/ML , mentor, and innovator. Open source lover :-)

The Latest Big Data Topics

article thumbnail
From Polling to PubSub: Building an Asynchronous OPC UA Stack in Python
The architectural design and engineering required to build a native, asynchronous OPC UA Pub/Sub (IEC 62541-14) stack in Python for the open-source opcua-asyncio library.
July 3, 2026
by Harshith Narasimhan Srivatsa
· 490 Views
article thumbnail
Real-Time AI Feature Engineering With Spark Structured Streaming and Databricks Feature Store
How Spark Structured Streaming and the Databricks Feature Store work together to build point-in-time-correct features from Kafka events to streaming transformations.
July 2, 2026
by Jubin Abhishek Soni DZone Core CORE
· 658 Views
article thumbnail
Dead Letter Queue Patterns in Apache Flink: Handling Poison Messages Without Stopping Your Stream
A poison message can trap a Flink job in a restart loop. Use side outputs, retries, tiered DLQs, durable sinks, and replay jobs to keep the stream running.
July 2, 2026
by Rohit Muthyala
· 749 Views
article thumbnail
Apache Spark Query Optimization on Databricks: Catalyst, AQE, and Photon Engine
Spark query performance on Databricks is driven by a multi-layer optimization stack: Catalyst transforms SQL into optimized execution plans.
July 2, 2026
by Jubin Abhishek Soni DZone Core CORE
· 695 Views
article thumbnail
Fine-Tuning LLMs at Scale With Databricks MLflow and Spark
Learn how Databricks, Apache Spark, MLflow, and Hugging Face Transformers work together to create an end-to-end fine-tuning platform.
June 30, 2026
by Jubin Abhishek Soni DZone Core CORE
· 1,025 Views
article thumbnail
Data Pipeline Observability: Why Your AI Model Fails in Production
Your machine learning model had 95% accuracy in testing, but crashes in production. The problem isn't the model, it's your data pipeline.
June 26, 2026
by Abhilash Rao Mesala
· 1,029 Views · 1 Like
article thumbnail
Implementing Asynchronous Communication Between Microservices Using Kafka and Spring Boot
Kafka decouples services, buffers spikes, and routes failures to a DLT. Schemas are contracts; consumers must be idempotent.
June 24, 2026
by Mallikharjuna Manepalli
· 1,713 Views
article thumbnail
Data Governance Checklist for AI-Driven Systems
A practical checklist for evaluating AI data readiness, covering data quality, governance, lineage, access controls, retrieval systems, and ongoing monitoring.
June 23, 2026
by Abhishek Gupta DZone Core CORE
· 1,267 Views · 2 Likes
article thumbnail
The Real-Time Revolution: Why Blockchain Needs Data Stream Processing
Blockchain and data streaming are bringing unprecedented levels of security, transparency, and real-time mechanisms to move data across the digital world.
June 17, 2026
by Gautam Goswami DZone Core CORE
· 1,464 Views
article thumbnail
Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot
Learn how Kotlin Coroutines improve Spring Boot Kafka batch processing with parallel execution, resource throttling, and faster database operations.
June 16, 2026
by Erkin Karanlık
· 2,241 Views · 1 Like
article thumbnail
From ETL to Lakeflow: Shifting to a Declarative Data Paradigm
The article focuses on moving away from traditional, "imperative" ETL processes to a modern, "declarative" approach using the Databricks Lakeflow platform.
June 15, 2026
by Seshendranath Balla Venkata
· 1,934 Views · 1 Like
article thumbnail
Rust-Native Alternatives to Spark SQL and DataFrame Workloads
Sail is an open-source computation framework that serves as a drop-in replacement for Apache Spark (SQL and DataFrame API) in both single-host and distributed settings.
June 11, 2026
by Srinivasarao Rayankula
· 1,956 Views · 2 Likes
article thumbnail
Combining Temporal and Kafka for Resilient Distributed Systems
Kafka handles durable event streaming while Temporal manages long-running workflow state, retries, and recovery to build resilient distributed systems.
June 9, 2026
by Akhil Madineni
· 1,774 Views · 1 Like
article thumbnail
The Big Data Architecture Blueprint: Core Storage, Integration, and Governance Patterns
This comprehensive technical guide breaks down the essential architectural, storage, and integration patterns required to scale enterprise big data platforms.
June 8, 2026
by Ram Ghadiyaram DZone Core CORE
· 1,784 Views · 1 Like
article thumbnail
Is the Data Warehouse Dead? 3 Patterns From Enterprise Architecture That Answer This Question
No, but its role has fundamentally changed. Here is what I have seen work, after building data platforms at enterprise scale across multiple industries.
June 5, 2026
by Nabarun Bandyopadhyay
· 4,019 Views · 1 Like
article thumbnail
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
STIX/TAXII in, ECS normalized, provenance preserved deterministic IDs, correct bulk writes, ingest pipelines keep threat indicator data reliable and queryable under load.
June 3, 2026
by Krishnaveni Musku
· 3,068 Views
article thumbnail
Optimizing Databricks Spark Pipelines Using Declarative Patterns
This article explains why hand-tuning Spark is becoming the slow path — and what the declarative alternatives actually look like in production.
June 1, 2026
by Seshendranath Balla Venkata
· 1,349 Views · 1 Like
article thumbnail
Event-Driven Pipelines With Apache Pulsar and Go
Build scalable, real-time pipelines with Apache Pulsar and Go using event-driven producers and consumers that communicate via Pulsar topics.
May 29, 2026
by Shivi Kashyap
· 2,952 Views
article thumbnail
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
Define API, event, and DB contracts upfront to enable parallel development, catch breaking changes in CI, and maintain consistent, reliable integrations.
May 29, 2026
by Wallace Espindola
· 2,618 Views
article thumbnail
Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions
Learn how to build an ETL pipeline with human-in-the-loop approval that costs nothing while waiting — and see real cost data from processing 1,000 documents.
May 28, 2026
by Harpreet Siddhu
· 4,477 Views · 1 Like
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • ...
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×