[Camel in Action] 실전편 – Camel과 Spring Batch 통합으로 대용량 배치 처리

Camel과 Spring Batch의 역할 분담

Camel은 데이터 라우팅과 변환에 강하고, Spring Batch는 대규모 배치 처리(청크, 재시작, 스킵, 리트라이)에 강합니다. 두 프레임워크를 결합하면 각자의 강점을 극대화할 수 있습니다. Camel이 배치 Job을 트리거하고, Spring Batch가 대용량 처리를 안정적으로 수행합니다.

Camel로 Spring Batch Job 트리거

// 파일 도착 시 배치 Job 실행
from("file:batch-input?move=.processing")
  .process(exchange -> {
    String filename = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
    JobParameters params = new JobParametersBuilder()
      .addString("inputFile", exchange.getIn().getHeader(Exchange.FILE_PATH, String.class))
      .addLong("timestamp", System.currentTimeMillis())
      .toJobParameters();
    exchange.getIn().setHeader("jobParams", params);
  })
  .bean(jobLauncher, "run(${header.importJob}, ${header.jobParams})")
  .process(exchange -> {
    JobExecution result = exchange.getIn().getBody(JobExecution.class);
    exchange.getIn().setHeader("jobStatus", result.getStatus());
  })
  .choice()
    .when(header("jobStatus").isEqualTo("COMPLETED"))
      .to("file:batch-input/.done")
    .otherwise()
      .to("file:batch-input/.failed")
  .end();

Spring Batch Job 정의

@Bean
public Job importOrdersJob() {
  return jobBuilderFactory.get("importOrders")
    .incrementer(new RunIdIncrementer())
    .flow(readStep())
    .next(processStep())
    .next(writeStep())
    .end()
    .build();
}

@Bean
public Step readStep() {
  return stepBuilderFactory.get("readStep")
    .<RawOrder, ProcessedOrder>chunk(1000) // 1000건씩 처리
    .reader(csvReader())
    .processor(orderProcessor())
    .writer(dbWriter())
    .faultTolerant()
    .skip(ValidationException.class).skipLimit(100) // 최대 100건 스킵
    .retry(SocketTimeoutException.class).retryLimit(3)
    .build();
}

배치 실패 재처리 전략

// 실패한 배치를 자동으로 재실행
from("quartz2:batch-retry?cron=0+*/30+*+*+*+?")
  .to("sql:SELECT job_name, job_parameters FROM batch_job_execution "
    + "WHERE status='FAILED' AND start_time > CURRENT_DATE")
  .split(body())
    .process(exchange -> {
      // 실패한 Job 재실행
      jobLauncher.run(jobs.get(exchange.getIn().getBody(Map.class).get("job_name")),
        buildParams(exchange.getIn().getBody(Map.class).get("job_parameters")));
    })
  .end();

실시간 배치 진행 상황 모니터링

// Spring Batch 리스너로 진행 상황을 Camel로 전달
@Component
public class BatchProgressListener implements StepExecutionListener {
  @Autowired
  private ProducerTemplate template;

  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    Map<String, Object> progress = Map.of(
      "jobName", stepExecution.getJobExecution().getJobInstance().getJobName(),
      "stepName", stepExecution.getStepName(),
      "readCount", stepExecution.getReadCount(),
      "writeCount", stepExecution.getWriteCount(),
      "status", stepExecution.getStatus().toString()
    );
    template.sendBody("activemq:topic:batch-progress", progress);
    return stepExecution.getExitStatus();
  }
}

Leave a Comment