Spring Integration

This page explains how to wire pulsaride-transform into a Spring Boot application: which beans to declare, which are provided by the library, and how the two modes (BATCH and CDC) differ.

The integration pattern

pulsaride-transform is a library, not a framework that generates Spring Batch jobs for you. Your application provides three things; the library provides everything else:

Your application providesThe library provides
A target DataSource beanMigrationLoader — parses YAML configs
A CommandLineRunner (batch) or Kafka consumer (CDC) that drives the loopPulsarideEventProcessor — applies filter + field mappings + expressions per row
YAML migration configs (one per table)CsvRowReader / JdbcRowReader — source adapters
JdbcRowWriter — upsert into PostgreSQL (dialect-aware)
DeferralEngine — defers rows with unresolved FK references, replays them when the referenced row arrives

BATCH mode — minimal setup

Three Java files are required. No Spring Batch Job or Step beans.

application.yml

spring:
  datasource:
    # Standard Spring Boot datasource — auto-configures the DataSource bean
    url:      jdbc:postgresql://localhost:5432/targetdb
    username: ${DB_USER}
    password: ${DB_PASS}
  main:
    web-application-type: none   # no HTTP server for batch

migration:
  tables:
    - table:  products
      config: migration/01-products.yaml
    - table:  orders
      config: migration/02-orders.yaml
  batch-size: 500

MigrationRunner.java (the complete wiring)

@Component
public class MigrationRunner implements CommandLineRunner {

    private final DataSource          dataSource;
    private final MigrationProperties props;

    @Override
    public void run(String... args) throws Exception {
        JdbcRowWriter writer = new JdbcRowWriter(dataSource);

        for (var entry : props.getTables()) {
            // 1. Load YAML → typed config
            MigrationConfig config = MigrationLoader.load(Path.of(entry.getConfig()));

            // 2. Build processor: applies filter + field rules per row
            var processor = new PulsarideEventProcessor(
                config, new DebeziumEventNormalizer());

            // 3. Read source rows (CSV or JDBC)
            List<Map<String, Object>> rows = readCsv(config);

            // 4. Transform and write
            for (var row : rows) {
                Map<String, Object> out = processor.process(row, null);
                if (out != null) {   // null = filtered out
                    writer.upsertAll(config.getTarget_table(),
                                     List.of(out), List.of("id"));
                }
            }
        }
    }
}

Two-datasource setup (Oracle → PostgreSQL)

When reading from a JDBC source (Oracle, MySQL, etc.) you need two separate DataSource beans. Spring Boot's auto-configuration only creates one, so you declare them explicitly:

@Configuration
public class DataSourceConfig {

    // Target (PostgreSQL) — always required
    @Bean("targetDataSource")
    @ConfigurationProperties("spring.datasource.target")
    public DataSource targetDataSource() {
        return DataSourceBuilder.create().build();
    }

    // Source (Oracle) — only required when reading via JDBC
    @Bean("sourceDataSource")
    @ConditionalOnProperty("spring.datasource.source.url")
    @ConfigurationProperties("spring.datasource.source")
    public DataSource sourceDataSource() {
        return DataSourceBuilder.create().build();
    }

    // Fallback: reuse target if no source is configured (CSV-only mode)
    @Bean("sourceDataSource")
    @ConditionalOnMissingBean(name = "sourceDataSource")
    public DataSource sourceDataSourceFallback(
            @Qualifier("targetDataSource") DataSource target) {
        return target;
    }
}
# application.yml — two-datasource variant
spring:
  datasource:
    source:
      url:               jdbc:oracle:thin:@//oracle-host:1521/ORCL
      username:          ${SOURCE_USER}
      password:          ${SOURCE_PASS}
      driver-class-name: oracle.jdbc.OracleDriver
    target:
      url:               jdbc:postgresql://pg-host:5432/targetdb
      username:          ${TARGET_USER}
      password:          ${TARGET_PASS}

Then in your migration YAML, set sources[].type: jdbc and supply a table: or a custom sql: query.

@EnablePulsarideTransform — the full framework

For production workloads that need health monitoring, a Dead Letter Queue, a run registry, and CDC mode, add @EnablePulsarideTransform to your main class:

@SpringBootApplication
@EnablePulsarideTransform
@ConfigurationPropertiesScan
public class MigrationApp {
    public static void main(String[] args) {
        SpringApplication.run(MigrationApp.class, args);
    }
}

This annotation activates additional beans based on pulsaride.mode:

pulsaride.modeBeans activated
BATCH (default)RunRegistry, MigrationEngine, MigrationMetrics, DlqService, PulsarideHealthIndicator
CDCRunRegistry, DlqService, PulsarideHealthIndicator, pulsarideEventProcessors — MigrationEngine is not activated (no source DB required)
# application.yml — when using @EnablePulsarideTransform
pulsaride:
  mode:         BATCH           # BATCH (default) | CDC
  registry-dir: .pulsaride/     # where run history is stored

The healthcare-migration-service example uses @EnablePulsarideTransform and demonstrates the full stack: deferred FK resolution, health endpoints, a Kafka CDC consumer, and the monitoring dashboard.

CDC mode — event-driven

In CDC mode the library processes row-level change events (INSERT / UPDATE / DELETE) emitted by Debezium. Your application provides a Kafka consumer that calls the processor per event. The library handles normalisation, transformation, upsert/delete routing, and circuit-breaking.

# Set in application-cdc.yml
pulsaride:
  mode: CDC

# Per-table config: specify the CDC event format
# (default is "debezium"; set cdc_format in the migration YAML)
# name: orders
# cdc_format: debezium   # debezium | flat | passthrough | custom

See the healthcare-migration-service example (CdcRowProcessor.java and application-cdc.yml) for a complete working CDC setup.