Camel과 Spring Batch의 역할 분담
Camel은 데이터 라우팅과 변환에 강하고, Spring Batch는 대규모 배치 처리(청크, 재시작, 스킵, 리트라이)에 강합니다. 두 프레임워크를 결합하면 각자의 강점을 극대화할 수 있습니다. Camel이 배치 Job을 트리거하고, Spring Batch가 대용량 처리를 안정적으로 수행합니다.
Camel로 Spring Batch Job 트리거
// 파일 도착 시 배치 Job 실행
from("file:batch-input?move=.processing")
.process(exchange -> {
String filename = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
JobParameters params = new JobParametersBuilder()
.addString("inputFile", exchange.getIn().getHeader(Exchange.FILE_PATH, String.class))
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
exchange.getIn().setHeader("jobParams", params);
})
.bean(jobLauncher, "run(${header.importJob}, ${header.jobParams})")
.process(exchange -> {
JobExecution result = exchange.getIn().getBody(JobExecution.class);
exchange.getIn().setHeader("jobStatus", result.getStatus());
})
.choice()
.when(header("jobStatus").isEqualTo("COMPLETED"))
.to("file:batch-input/.done")
.otherwise()
.to("file:batch-input/.failed")
.end();
Spring Batch Job 정의
@Bean
public Job importOrdersJob() {
return jobBuilderFactory.get("importOrders")
.incrementer(new RunIdIncrementer())
.flow(readStep())
.next(processStep())
.next(writeStep())
.end()
.build();
}
@Bean
public Step readStep() {
return stepBuilderFactory.get("readStep")
.<RawOrder, ProcessedOrder>chunk(1000) // 1000건씩 처리
.reader(csvReader())
.processor(orderProcessor())
.writer(dbWriter())
.faultTolerant()
.skip(ValidationException.class).skipLimit(100) // 최대 100건 스킵
.retry(SocketTimeoutException.class).retryLimit(3)
.build();
}
배치 실패 재처리 전략
// 실패한 배치를 자동으로 재실행
from("quartz2:batch-retry?cron=0+*/30+*+*+*+?")
.to("sql:SELECT job_name, job_parameters FROM batch_job_execution "
+ "WHERE status='FAILED' AND start_time > CURRENT_DATE")
.split(body())
.process(exchange -> {
// 실패한 Job 재실행
jobLauncher.run(jobs.get(exchange.getIn().getBody(Map.class).get("job_name")),
buildParams(exchange.getIn().getBody(Map.class).get("job_parameters")));
})
.end();
실시간 배치 진행 상황 모니터링
// Spring Batch 리스너로 진행 상황을 Camel로 전달
@Component
public class BatchProgressListener implements StepExecutionListener {
@Autowired
private ProducerTemplate template;
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
Map<String, Object> progress = Map.of(
"jobName", stepExecution.getJobExecution().getJobInstance().getJobName(),
"stepName", stepExecution.getStepName(),
"readCount", stepExecution.getReadCount(),
"writeCount", stepExecution.getWriteCount(),
"status", stepExecution.getStatus().toString()
);
template.sendBody("activemq:topic:batch-progress", progress);
return stepExecution.getExitStatus();
}
}