본문 바로가기
인프라/Kafka

동시성제어(2) - Kafka

by jungmin.park 2023. 12. 12.

지난 redis를 사용하면서 겪은 문제가 있다.

문제1. 다른 서비스에 영향이 갈 수 있다.

현재 로직은 쿠폰 발급 요청이 들어오면 레디스를 활요해서 쿠폰의 발급 개수를 가져온 후 발급이 가능하다면 rds에 저장하는 방식이다.

이 방식은 쿠폰의 개수가 많아질수록 RDB에 부하를 주게 되는데

 

예시로

MySQL이 1분에 100개 insert 작업만 가능하다고 가정한다.

이 상태에서 10시에 만개의 쿠폰 생성 요청이 들어오고 10시 1분에 주문 생성 요청, 10시 2분에 회원 가입 요청이 들어온다면?

1분에 100개씩 만개를 생성하려면 100분이 걸리게 된다.

 

주문생성과 회원가입은 100분 이후에 생성이 된다.

 

문제2. DB서버의 부하로 서비스 지연 및 오류로 이어질 수 있다. 

만약 다른 서비스에 타임아웃이 없다면 느리게라도 모든 요청이 처리가 되겠지만 대부분의 서비스에는 타임아웃 옵션이 설정되어있다.

그렇기 때문에 주문 회원가입 뿐만 아니라 일부분의 쿠폰도 생성이 되지 않는 오류가 발생가능하다.

 


KafKa의 도입

  • 카프카는 분산 이벤트 스트리밍 플랫폼입니다.
  • 이벤트 스트리밍이란 소스에서 목적지까지 이벤트를 실시간으로 스트리밍하는 것을 의미한다.

 

KafKa와 Redis 비교

  Apache Kafka Redis
메시지 크기 압축 및 계층형 스토리지로 최대 1GB의 메시지 크기를 지원합니다. 더 작은 메시지 크기를 지원합니다.
메시지 전송 구독자는 대기열에서 메시지를 가져옵니다. Redis 서버는 연결된 가입자에게 메시지를 푸시합니다.
메시지 보존 검색 후 메시지를 보존합니다. 메시지를 보관하지 않습니다.
오류처리 메시징 수준에서의 강력한 오류 처리, 배달 못한 편지 대기열, 이벤트 재시도 및 리디렉션 제한 시간, 클라이언트 제한 및 메모리 버퍼 용량을 사용하여 애플리케이션 수준에서 redis 예외를 처리해야 합니다.
병렬 처리 Kafka는 병렬 처리를 지원합니다. 여러 소비자가 동일한 메시지를 동시에 검색할 수 있습니다. 병렬 처리를 지원하지 않습니다.
처리량 비동기 읽기/쓰기로 인해 처리량이 더 높습니다. Redis 서버가 다른 구독자에게 메시지를 보내기 전에 회신을 기다려야 하기 때문에 처리량이 줄어듭니다.
지연시간 짧은 지연 시간, 기본적으로 데이터 복제로 인해 Redis보다 약간 느립니다. 작은 크기의 메시지를 배포할 때 지연 시간이 매우 짧습니다.
내결함성 파티션을 다른 브로커에 자동으로 백업합니다. 기본적으로 백업되지 않습니다. 사용자는 Redis 지속성을 수동으로 활성활 할 수 있습니다. 대규모 데이터 손실 위험.

 

https://aws.amazon.com/ko/compare/the-difference-between-kafka-and-redis/

 

Redis와 Kafka 비교 - 게시/구독 메시징 시스템 간의 차이점 - AWS

Apache Kafka는 대규모 데이터 세트를 스트리밍하고 높은 복구 성능이 필요한 애플리케이션을 구축하는 데 더 적합합니다. 전달되는 수조 개의 메시지를 처리할 수 있는 단일 분산 데이터 파이프라

aws.amazon.com

 


Spring Boot build.gradle 추가

implementation 'org.springframework.kafka:spring-kafka'

 

 

Producer 인스턴스를 생성하기 위한 설정  및 Kafka Template 생성

@Configuration
public class KafkaProducerConfig {
	@Bean
	public ProducerFactory<String, Long> producerFactory(){
    	// 설정값들을 담아줄 맵
		Map<String, Object> config = new HashMap<>();
		
        // 서버의 정보
		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 키 시리얼라이즈 클래스 정보
		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 밸류 시리얼라이즈 클래스 정보
		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

		return new DefaultKafkaProducerFactory<>(config);
	}

	@Bean
	public KafkaTemplate<String, Long> kafkaTemplate(){
		return new KafkaTemplate<>(producerFactory());
	}
}
  • producer 인스턴스를 생서하는데 필요한 설정값들을 설정해준다.

 

Producer 생성

  • 토픽에 데이터를 전송
@Component
public class CouponCreateProducer {
	private final KafkaTemplate<String, Long> kafkaTemplate;

	public CouponCreateProducer(KafkaTemplate<String, Long> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	public void create(Long userId){
    	// coupon_event는 토픽의 이름
		kafkaTemplate.send("coupon_event", userId);
	}
}

 

ApplyService에 Producer 추가

public void apply(Long userId){
		//long count = couponRepository.count();
		Long count = couponCountRepository.increment();
		//log.info("쿠폰 개수: {}", count);

		if(count > 100){
			return;
		}
		//couponRepository.save(new Coupon(userId));
		couponCreateProducer.create(userId);
	}

 


Consumer 생성하기 위한 Config 및 리스너 설정

  • 컨슈머에서는 카프카를 사용하고 쿠폰을 생성할 것이기 때문에 MySql이 필요
@Configuration
public class KafkaConsumerConfig {
	@Bean
	public ConsumerFactory<String, Long> consumerFactory(){
		Map<String, Object> config = new HashMap<>();

		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_2");
		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);

		return new DefaultKafkaConsumerFactory<>(config);
	}
	
    
    // 토픽으로부터 메시지를 전달받기 위한 카프카 리스너
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory(){
		ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());

		return factory;
	}

}

 

Consumer 생성

@Component
public class CouponCreatedConsumer {
	private final CouponRepository couponRepository;

	public CouponCreatedConsumer(CouponRepository couponRepository) {
		this.couponRepository = couponRepository;
	}

	@KafkaListener(topics = "coupon_event", groupId = "group_2")
	public void listener(Long userId){
		System.out.println(userId);
		couponRepository.save(new Coupon(userId));
	}
}

 

Test코드

@Test
	public void 여러명응모() throws InterruptedException {
		int threadCount = 1000;
		ExecutorService executorService = Executors.newFixedThreadPool(32);
		//다른스레드에서 수행해주는 작업을 기다려주는 클래스
		CountDownLatch latch = new CountDownLatch(threadCount);

		for( int i = 0; i < threadCount; i++){
			long userId = i;
			executorService.submit(() -> {
				try{
					applyService.apply(userId);
				}finally {
					latch.countDown();
				}
			});
		}

		latch.await();

		Thread.sleep(10000);
		long count = couponRepository.count();


		assertThat(count).isEqualTo(100);
	}

 

 

 

 

Thread을 추가한 이유

  • Thread을 추가하지 않았을때 기대했던 100장이 나왔다 원인이 무엇일까?

 

실시간 데이처리가 아니기 때문이다.

Time Test case Producer Consumer
10:00 테스트케이스 시작   데이터 수신중..
10:01   데이터 전송완료 데이터 처리..
10:02 테스트 케이스 종료   데이터 처리..
10:03     데이터 처리..
10:04     데이터 처리완료
  • 10시에 테스트 케이스를 시작한다고 가정
  • 프로듀서가 데이터를 전송 완료한 시점이 10시 1분
  • 테스트 케이스가 종료되는 시간 10시 2분
  • 컨슈머는 데이터를 수신하고 있다가 토픽에 데이터가 전송되면 데이터를 받아서 쿠폰을 생성
  • 쿠폰이 모두 생성되는 시간을 10시 4분이라고 가정
  • 테스트 케이스는 데이터가 전송이 완료된 시점을 기준으로 쿠폰의 개수를 가져오고 컨슈머에서는 그 시점에 아직 모든 쿠폰을 생성하지 않았기 때문에 테스트 케이스가 실패하는 것이다.

결국 Thread.sleep은 컨슈머가 객체를 생성할 시간을 벌어주는 것이다.

카프카를 사용한다면 API에서 직접 쿠폰을 생성할 때에 비해서 처리량을 조절할 수 있게 된다.

처리량을 조절함에 따라서 데이터베이스의 부하를 줄일 수 있다는 장점이 있습니다.

하지만 쿠폰 생성까지 약간의 텀이 발생된다는 단점이 존재합니다.

 

 

 


쿠폰을 1인당 1개로 개수를 제한하자

방법1. 데이터베이스의 유니크키를 사용

  • 쿠폰 테이블에 userid와 coupon type이라는 컬럼을 추가한 뒤에 userid와 coupon type의 유니크키를 검으로써 1개만 생성되도록 데이터베이스 레벨에서 막는 방법 
  • 하지만 보통의 서비스는 한 유저가 같은 타입을 여러개 가질 수 있기 때문에 실용적이지 않다.

방법2. 범위로 락을 잡고 처음에 쿠폰 발급 여부를 가져와서 판단하는 방식

  • 범위로 락을 잡고 쿠폰 발급 여부를 가지고 온 뒤에 아직 발급이 안됐다면 쿠폰 발급을 하고 발급이 됐다면 쿠폰을 발급하지 않는 방법
  • 지금 쿠폰 생성은 컨슈머에서 생성하고 있기 때문에 시간차가 존재하며  그 시간차이 때문에 한 명이 두개가 발급되는 경우 발생
    • API에서 직접 쿠폰을 발급한다고해도 락 범위가 너무 넓어지게 된다.

방법3. Set 자료구조 사용

  • 유저가 쿠폰 응모를 하면 set에 추가하고 set에 존재하지 않는다면 발급을 진행하고 이미 존재하는 유저라면 발급하지 않을 것이다.

 

Set을 관리할 레파지토리 생성

@Repository
public class AppliedUserRepository {
	private final RedisTemplate<String, String> redisTemplate;

	public AppliedUserRepository(RedisTemplate<String, String> redisTemplate) {
		this.redisTemplate = redisTemplate;
	}
	
    // 셋의 데이터를 넣기 위한 메소드 생성
	public Long add(Long userId){
		return redisTemplate
			.opsForSet()
			.add("applied_user", userId.toString());
	}
}

 

ApplyService 변경

public void apply(Long userId){
		Long apply = appliedUserRepository.add(userId);

		if(apply != 1){
			return;
		}
        ..
        
}
  • 만약 추가된 개수가 1이 아니라면 유저는 이미 발급 요청 했던 유저로 쿠폰을 발급받지 않고 리턴해준다.
  • 추가된 개수가 1이라면 기존의 쿠폰 발급 로직을 수행
@Test
	public void 한명당_한개의쿠폰() throws InterruptedException {
		int threadCount = 1000;
		ExecutorService executorService = Executors.newFixedThreadPool(32);
		CountDownLatch latch = new CountDownLatch(threadCount);

		for( int i = 0; i < threadCount; i++){
			long userId = i;
			executorService.submit(() -> {
				try{
					applyService.apply(1L);
				}finally {
					latch.countDown();
				}
			});
		}

		latch.await();

		Thread.sleep(10000);
		long count = couponRepository.count();


		assertThat(count).isEqualTo(1);
	}

'인프라 > Kafka' 카테고리의 다른 글

[Kafka] Kafka Topic 삭제  (0) 2023.12.13