Spring Integration
This page explains how to wire pulsaride-transform into a Spring Boot application: which beans to declare, which are provided by the library, and how the two modes (BATCH and CDC) differ.
The integration pattern
pulsaride-transform is a library, not a framework that generates Spring Batch jobs for you. Your application provides three things; the library provides everything else:
| Your application provides | The library provides |
|---|---|
A target DataSource bean | MigrationLoader — parses YAML configs |
A CommandLineRunner (batch) or Kafka consumer (CDC) that drives the loop | PulsarideEventProcessor — applies filter + field mappings + expressions per row |
| YAML migration configs (one per table) | CsvRowReader / JdbcRowReader — source adapters |
JdbcRowWriter — upsert into PostgreSQL (dialect-aware) | |
DeferralEngine — defers rows with unresolved FK references, replays them when the referenced row arrives |
BATCH mode — minimal setup
Three Java files are required. No Spring Batch Job or Step beans.
application.yml
spring:
datasource:
# Standard Spring Boot datasource — auto-configures the DataSource bean
url: jdbc:postgresql://localhost:5432/targetdb
username: ${DB_USER}
password: ${DB_PASS}
main:
web-application-type: none # no HTTP server for batch
migration:
tables:
- table: products
config: migration/01-products.yaml
- table: orders
config: migration/02-orders.yaml
batch-size: 500MigrationRunner.java (the complete wiring)
@Component
public class MigrationRunner implements CommandLineRunner {
private final DataSource dataSource;
private final MigrationProperties props;
@Override
public void run(String... args) throws Exception {
JdbcRowWriter writer = new JdbcRowWriter(dataSource);
for (var entry : props.getTables()) {
// 1. Load YAML → typed config
MigrationConfig config = MigrationLoader.load(Path.of(entry.getConfig()));
// 2. Build processor: applies filter + field rules per row
var processor = new PulsarideEventProcessor(
config, new DebeziumEventNormalizer());
// 3. Read source rows (CSV or JDBC)
List<Map<String, Object>> rows = readCsv(config);
// 4. Transform and write
for (var row : rows) {
Map<String, Object> out = processor.process(row, null);
if (out != null) { // null = filtered out
writer.upsertAll(config.getTarget_table(),
List.of(out), List.of("id"));
}
}
}
}
}Two-datasource setup (Oracle → PostgreSQL)
When reading from a JDBC source (Oracle, MySQL, etc.) you need two separate DataSource beans. Spring Boot's auto-configuration only creates one, so you declare them explicitly:
@Configuration
public class DataSourceConfig {
// Target (PostgreSQL) — always required
@Bean("targetDataSource")
@ConfigurationProperties("spring.datasource.target")
public DataSource targetDataSource() {
return DataSourceBuilder.create().build();
}
// Source (Oracle) — only required when reading via JDBC
@Bean("sourceDataSource")
@ConditionalOnProperty("spring.datasource.source.url")
@ConfigurationProperties("spring.datasource.source")
public DataSource sourceDataSource() {
return DataSourceBuilder.create().build();
}
// Fallback: reuse target if no source is configured (CSV-only mode)
@Bean("sourceDataSource")
@ConditionalOnMissingBean(name = "sourceDataSource")
public DataSource sourceDataSourceFallback(
@Qualifier("targetDataSource") DataSource target) {
return target;
}
}# application.yml — two-datasource variant
spring:
datasource:
source:
url: jdbc:oracle:thin:@//oracle-host:1521/ORCL
username: ${SOURCE_USER}
password: ${SOURCE_PASS}
driver-class-name: oracle.jdbc.OracleDriver
target:
url: jdbc:postgresql://pg-host:5432/targetdb
username: ${TARGET_USER}
password: ${TARGET_PASS}Then in your migration YAML, set sources[].type: jdbc and supply a table: or a custom sql: query.
@EnablePulsarideTransform — the full framework
For production workloads that need health monitoring, a Dead Letter Queue, a run registry, and CDC mode, add @EnablePulsarideTransform to your main class:
@SpringBootApplication
@EnablePulsarideTransform
@ConfigurationPropertiesScan
public class MigrationApp {
public static void main(String[] args) {
SpringApplication.run(MigrationApp.class, args);
}
}This annotation activates additional beans based on pulsaride.mode:
pulsaride.mode | Beans activated |
|---|---|
BATCH (default) | RunRegistry, MigrationEngine, MigrationMetrics, DlqService, PulsarideHealthIndicator |
CDC | RunRegistry, DlqService, PulsarideHealthIndicator, pulsarideEventProcessors — MigrationEngine is not activated (no source DB required) |
# application.yml — when using @EnablePulsarideTransform pulsaride: mode: BATCH # BATCH (default) | CDC registry-dir: .pulsaride/ # where run history is stored
The healthcare-migration-service example uses @EnablePulsarideTransform and demonstrates the full stack: deferred FK resolution, health endpoints, a Kafka CDC consumer, and the monitoring dashboard.
CDC mode — event-driven
In CDC mode the library processes row-level change events (INSERT / UPDATE / DELETE) emitted by Debezium. Your application provides a Kafka consumer that calls the processor per event. The library handles normalisation, transformation, upsert/delete routing, and circuit-breaking.
# Set in application-cdc.yml pulsaride: mode: CDC # Per-table config: specify the CDC event format # (default is "debezium"; set cdc_format in the migration YAML) # name: orders # cdc_format: debezium # debezium | flat | passthrough | custom
See the healthcare-migration-service example (CdcRowProcessor.java and application-cdc.yml) for a complete working CDC setup.