Skip to content

Problem: ShopHero, a small e-commerce company. They wants to build real-time data platform that ensures only valid, high-quality customer orders flow into downstream systems. This platform should catch invalid orders, enrich valid orders with product details, store results for analytics, and replicate data to another region for disaster recovery.

Notifications You must be signed in to change notification settings

jayramani/ShopHero

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ShopHero – Real-Time Order Guardrails

High-Level Architecture

ShopHero drawio

1. What is this platform?

A real-time data processing pipeline built with Spring Boot, Apache Kafka, Apache Flink, Kafka Connect, MinIO, Postgres, and MirrorMaker 2. This project demonstrates how to ingest, validate, enrich, store, and replicate data across clusters using modern event-driven architecture patterns.

The flow:

OrderGenerator (via Postman) → Kafka orders
Postgres products table → Kafka products (via JDBC Source Connector)

Flink joins orders with products, validates & enriches →
enriched-orders or dlt-orders → MinIO via S3 Sink Connector → replicated to west via MM2.


2. High-level component

  • Order generator (Spring Boot)

    • Exposes REST endpoint (e.g. POST /orders)
    • Produces Avro Order events to topic orders
  • Postgres

    • Holds products table
  • Kafka Connect (east)

    • JDBC Source: products sql table → Kafka topic products
    • S3/MinIO Sink: enriched-orders → MinIO bucket
  • Flink job – OrderGuardrailJob

    • Consumes orders + products
    • Validates / enriches orders
    • Writes valid to enriched-orders, rejects to dlt-orders
  • Schema Registry

    • Stores Avro schemas for Order (v1, v2, …) and Product
    • Used by producer, Connect, and Flink
  • Security (east)

    • Kafka has a SASL_PLAINTEXT listener with PLAIN mechanism
    • JAAS defines admin, producer, flink, connect users
    • ACLs restrict access to topics & consumer groups
  • Kafka west + MM2

    • Second Kafka cluster + MM2 process
    • Replicates orders and enriched-orders from east → west

3. Prerequisites

  • Docker
  • Java 17
  • Maven 3.9.*
  • Postman (Optional)

4. Run the project

Installation

Step 1 – Clone the Repository

git clone <your-repo-url>
cd ShopHero

Step 2 – Build Java Components

# Build Spring Boot OrderGenerator
cd OrderGenerator
mvn clean package

# Build Flink streaming job
cd ../FlinkJob
mvn clean package

# Return to repo root
cd ..

After building, you should have:

  • OrderGenerator/target/...jar – Spring Boot API
  • FlinkJob/target/flink-job.jar – Flink job JAR

Step 3 – Start the Platform

Start Kafka (east and west), Schema Registry, Connect, Control Center, Flink, MinIO, and Postgres:

cd Platform
docker compose up -d

Verify containers:

docker ps

This brings up:

  • kafka-east, kafka-west, schema-registry-east
  • connect-east, control-center
  • postgres, minio
  • flink-jobmanager, flink-taskmanager
  • mm2-east-to-west

Step 4 – Apply Kafka ACLs

cd kafka
chmod +x acls.sh
./acls.sh
cd ..

This script uses the Kafka admin user to:

  • Grant producer user write, create and describe access to orders
  • Grant flink user access to orders, products, enriched-orders, dlt-orders
  • Grant connect access to internal Connect topics and groups

⚠️ If you change usernames, passwords, or topic names, update Platform/kafka/acls.sh and jaas.conf accordingly.


Step 5 – Pre-create Kafka Topics (Optional)

Avoid consumer startup errors by pre-creating the topics:

# Create 'orders' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
  --create --topic orders --partitions 3 --replication-factor 1

# Create 'products' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
  --create --topic products --partitions 3 --replication-factor 1

# Create 'enriched-orders' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
  --create --topic enriched-orders --partitions 3 --replication-factor 1

# Create 'dlt-orders' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
  --create --topic dlt-orders --partitions 3 --replication-factor 1

Step 6 – Register Kafka Connect Connectors

cd Platform/connect

# Register JDBC Source Connector (Postgres → Kafka)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  --data @jdbc-postgres-products.json

# Register MinIO Sink Connector (Kafka → MinIO)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  --data @enriched-orders-minio-sink.json

cd ../../..

Verify in Control Center: http://localhost:9021


Step 7 – Submit the Flink Job

# Copy JAR to JobManager
docker exec -it flink-jobmanager mkdir -p /opt/flink/usrlib
  
docker cp target/flink-job-1.0-SNAPSHOT.jar \
  flink-jobmanager:/opt/flink/usrlib/flink-job.jar

# Submit Flink job
docker exec -it flink-jobmanager \
  flink run \
  -c com.shophero.flink.OrderGuardrailJob \
  /opt/flink/usrlib/flink-job.jar

Monitor via Flink Dashboard: http://localhost:8082


Step 8 – Run the OrderGenerator App

cd OrderGenerator
mvn spring-boot:run
  • Produces Avro orders to Kafka
  • Uses Schema Registry
  • Authenticates using SASL credentials in application.yml

Step 9 – Send a Test Order

Use Postman or curl:

POST http://localhost:8080/orders
Content-Type: application/json

{
  "orderId": "order-101",
  "customerId": "customer-1",
  "productId": "P-100",
  "quantity": 2,
  "price": 19.99,
  "currency": "USD"
}

Flow:

  • Spring Boot → Kafka (orders)
  • Flink reads orders + products → validates, enriches
  • Valid → enriched-orders; Invalid → dlt-orders
  • Kafka Connect → MinIO
  • MirrorMaker 2 → replicate to Kafka West

Component URLs

Component URL
Control Center http://localhost:9021
Schema Registry http://localhost:8081
Kafka Connect REST http://localhost:8083
Flink Dashboard http://localhost:8082
MinIO Console http://localhost:9001
MinIO S3 Endpoint http://localhost:9000
Postgres DB localhost:5432 (shophero/shophero)
Kafka East (Plain) localhost:29092
Kafka East (SASL) localhost:29094

Log in to MinIO at http://localhost:9001 using credentials from docker-compose.yml. Check the bucket used in enriched-orders-minio-sink.json.


MirrorMaker 2

MirrorMaker 2 replicates topics (orders, enriched-orders) from Kafka East to Kafka West using the mm2-east-to-west.properties configuration.

Verify replication:

docker exec -it kafka-west \
  kafka-topics \
  --bootstrap-server kafka-west:29092 \
  --list
  
docker exec -it kafka-west \
  kafka-console-consumer \
  --bootstrap-server kafka-west:29092 \
  --topic orders \
  --from-beginning

Cleanup

cd Platform
docker compose down            # Stop containers
docker compose down -v         # Optional: remove volumes

Stop the Spring Boot app with Ctrl+C.

About

Problem: ShopHero, a small e-commerce company. They wants to build real-time data platform that ensures only valid, high-quality customer orders flow into downstream systems. This platform should catch invalid orders, enrich valid orders with product details, store results for analytics, and replicate data to another region for disaster recovery.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •