High-Level Architecture
A real-time data processing pipeline built with Spring Boot, Apache Kafka, Apache Flink, Kafka Connect, MinIO, Postgres, and MirrorMaker 2. This project demonstrates how to ingest, validate, enrich, store, and replicate data across clusters using modern event-driven architecture patterns.
The flow:
OrderGenerator (via Postman) → Kafka
orders
Postgresproductstable → Kafkaproducts(via JDBC Source Connector)Flink joins
orderswithproducts, validates & enriches →
enriched-ordersordlt-orders→ MinIO via S3 Sink Connector → replicated to west via MM2.
-
Order generator (Spring Boot)
- Exposes REST endpoint (e.g.
POST /orders) - Produces Avro
Orderevents to topicorders
- Exposes REST endpoint (e.g.
-
Postgres
- Holds
productstable
- Holds
-
Kafka Connect (east)
- JDBC Source:
productssql table → Kafka topicproducts - S3/MinIO Sink:
enriched-orders→ MinIO bucket
- JDBC Source:
-
Flink job –
OrderGuardrailJob- Consumes
orders+products - Validates / enriches orders
- Writes valid to
enriched-orders, rejects todlt-orders
- Consumes
-
Schema Registry
- Stores Avro schemas for
Order(v1, v2, …) andProduct - Used by producer, Connect, and Flink
- Stores Avro schemas for
-
Security (east)
- Kafka has a SASL_PLAINTEXT listener with PLAIN mechanism
- JAAS defines
admin, producer, flink, connectusers - ACLs restrict access to topics & consumer groups
-
Kafka west + MM2
- Second Kafka cluster + MM2 process
- Replicates
ordersandenriched-ordersfrom east → west
- Docker
- Java 17
- Maven 3.9.*
- Postman (Optional)
git clone <your-repo-url>
cd ShopHero# Build Spring Boot OrderGenerator
cd OrderGenerator
mvn clean package
# Build Flink streaming job
cd ../FlinkJob
mvn clean package
# Return to repo root
cd ..After building, you should have:
OrderGenerator/target/...jar– Spring Boot APIFlinkJob/target/flink-job.jar– Flink job JAR
Start Kafka (east and west), Schema Registry, Connect, Control Center, Flink, MinIO, and Postgres:
cd Platform
docker compose up -dVerify containers:
docker psThis brings up:
- kafka-east, kafka-west, schema-registry-east
- connect-east, control-center
- postgres, minio
- flink-jobmanager, flink-taskmanager
- mm2-east-to-west
cd kafka
chmod +x acls.sh
./acls.sh
cd ..This script uses the Kafka admin user to:
- Grant
produceruser write, create and describe access toorders - Grant
flinkuser access toorders,products,enriched-orders,dlt-orders - Grant
connectaccess to internal Connect topics and groups
⚠️ If you change usernames, passwords, or topic names, updatePlatform/kafka/acls.shandjaas.confaccordingly.
Avoid consumer startup errors by pre-creating the topics:
# Create 'orders' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
--create --topic orders --partitions 3 --replication-factor 1
# Create 'products' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
--create --topic products --partitions 3 --replication-factor 1
# Create 'enriched-orders' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
--create --topic enriched-orders --partitions 3 --replication-factor 1
# Create 'dlt-orders' topic
docker exec -it kafka-east kafka-topics --bootstrap-server kafka-east:29092 \
--create --topic dlt-orders --partitions 3 --replication-factor 1cd Platform/connect
# Register JDBC Source Connector (Postgres → Kafka)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
--data @jdbc-postgres-products.json
# Register MinIO Sink Connector (Kafka → MinIO)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
--data @enriched-orders-minio-sink.json
cd ../../..Verify in Control Center: http://localhost:9021
# Copy JAR to JobManager
docker exec -it flink-jobmanager mkdir -p /opt/flink/usrlib
docker cp target/flink-job-1.0-SNAPSHOT.jar \
flink-jobmanager:/opt/flink/usrlib/flink-job.jar
# Submit Flink job
docker exec -it flink-jobmanager \
flink run \
-c com.shophero.flink.OrderGuardrailJob \
/opt/flink/usrlib/flink-job.jarMonitor via Flink Dashboard: http://localhost:8082
cd OrderGenerator
mvn spring-boot:run- Produces Avro orders to Kafka
- Uses Schema Registry
- Authenticates using SASL credentials in
application.yml
Use Postman or curl:
POST http://localhost:8080/orders
Content-Type: application/json
{
"orderId": "order-101",
"customerId": "customer-1",
"productId": "P-100",
"quantity": 2,
"price": 19.99,
"currency": "USD"
}Flow:
- Spring Boot → Kafka (
orders) - Flink reads
orders+products→ validates, enriches - Valid →
enriched-orders; Invalid →dlt-orders - Kafka Connect → MinIO
- MirrorMaker 2 → replicate to Kafka West
| Component | URL |
|---|---|
| Control Center | http://localhost:9021 |
| Schema Registry | http://localhost:8081 |
| Kafka Connect REST | http://localhost:8083 |
| Flink Dashboard | http://localhost:8082 |
| MinIO Console | http://localhost:9001 |
| MinIO S3 Endpoint | http://localhost:9000 |
| Postgres DB | localhost:5432 (shophero/shophero) |
| Kafka East (Plain) | localhost:29092 |
| Kafka East (SASL) | localhost:29094 |
Log in to MinIO at http://localhost:9001 using credentials from
docker-compose.yml. Check the bucket used inenriched-orders-minio-sink.json.
MirrorMaker 2 replicates topics (orders, enriched-orders) from Kafka East to Kafka West using the mm2-east-to-west.properties configuration.
Verify replication:
docker exec -it kafka-west \
kafka-topics \
--bootstrap-server kafka-west:29092 \
--list
docker exec -it kafka-west \
kafka-console-consumer \
--bootstrap-server kafka-west:29092 \
--topic orders \
--from-beginningcd Platform
docker compose down # Stop containers
docker compose down -v # Optional: remove volumesStop the Spring Boot app with Ctrl+C.