'프로그래밍' 카테고리의 글 목록 :: 잡다한 프로그래밍
반응형

1. Backpressure(배압) 현상 이란?

배압이란 데이터 생산(Producer)과 소비(Consumer)가 불균형적일 때 일어나는 현상이다.

만약 10,000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면? 데이터는 스트림에 계속 쌓이게 된다.

이는 OutOfMemoryError(OOM)로 이어져 어플리케이션이 죽게 될 것이다.

이러한 현상을 배압(Backpressure)이라고 하며 RxJava에서는 배압 현상을 제어할 수 있는 방법을 제공한다.

 

2. 어디서 주로 발생?

생산자, 소비자 라는 내용이 이전 내용과 비슷하지 않은가? (대용량 처리, kafka 참고) 생산자가 reactive하게 데이터를 쌓고, 소비자가 이를 가져다가 사용하는 구조에서 주로 발생하게 된다.

※ 정리하면 non-blocking 방식으로 데이터를 보내는구조에서 발생

 

3. Backpressure를 예방하기 위한 전략

 

반응형
반응형

1. 카프카의 기본 구성

카프카는 데이터를 받아서 전달하는 데이터 버스의 역할을 한다. (이전 MQ와 동일한 형태)

그리고 주키퍼는 카프카의 정상 동작을 보장하기위해 메타데이터를 관리한다.

 

앞서 설명한 Message Queue처럼, producer는 카프카에 메세지를 전달하고, consumer는 카프카에서 메세지를 꺼내간다.

이때 이 카프카에 대한 설정은 주키퍼에서 담당한다.

 

2. 카프카의 기본 개념

  • 브로커: 카프카가 설치되 서버, 또는 노드를 의미함
  • 프로듀서: 카프카로 메세지를 보내는 역할을 하는 클라이언트
  • 컨슈머: 카프카에서 메세지를 꺼내가는 역할을 하는 클라이언트
  • 토픽: 카프카는 메시지 피드들을 토픽으로 구분, 각 토픽은 카프카내에서 고유함
  • 파티션: 병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러개로 나눈 것을 말함
  • 세그먼트: 프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 파일을 말함
  • 메세지: 프로듀서가 브로커로 전송하고나, 컨슈머가 읽어가는 데이터를 의미함
  • 주키퍼(ZooKeeper): 카프카의 메타데이터 관리 및 브로커의 정상상태 점검(health check)을 담당합니다

 

토픽(TOPIC)

- producer가 실제로 보내는 메세지가 담기는 곳

- 하나의 토픽에 여러 프로듀서들이 메세지를 전송할 수 있고, 여러 컨슈머가 하나의 토픽에서 데이터를 읽어올 수 있다.

- 토픽을 효율적으로 관리하기위해, 토픽은 여러 파티션으로 구성되어야한다

 

파티션

여러프로듀서 A, B, C가 같은 'TOPIC1'에 데이터를 전송했다고 가정해보자.

토픽에 파티션이 1개라면 프로듀서 A, B, C가 보내는 데이터는 해당 파티션이 모두 감당해야한다.

이 경우 하나의 데이터 전송 완료 이후, 다음 메세지 전송이 가능해지면서 전송 속도에 영향을 미친다.

 

- 파티션을 여러개로 늘려 하나의 토픽을 병렬 처리할 수 있돌혹 한다.

 

▶ 적절한 파티션의 수는 어떻게 정할 수 있을까?

파티션의 수는 초기 생성 후 언제든 늘릴 수 있지만, 다시 줄일 수 없음

따라서 파티션수를 작게, 2개 혹은 4개 정도로 생성한 후, 메시지 처리량이나 컨슈머의 LAG(카프카의 남아있는 메시지수)를 보고 늘려 가는 방법이 가장 좋다.

https://eventsizer.io 를 참고해 적절한 파티션 수를 산정할 수도 있다 (참고용)

 

세그먼트

프로듀서가 hello my name is A라는 메세지가 "TOPIC1" 에 파티션 0에 저장되었다면 해당 메시지는 어떻게 저장되고 있을까?

바로 파티션0에 세그먼트 라는 로그 파일의 형태로 브로커의 로컬 디스크에 저장된다.

해당 로그파일을 확인하고 싶다면 /data/kafka-logs/에서 확인할 수 있다.

 

리플리케이션

리플리케이션이란, 각 메시지들을 여러개로 복제해서 카프카 클러스터내 브로커들에게 분산시키는 동작을 의미한다.

이러한 리플리케이션 동작덕분에 하나의 브로커가 종료되더라도 카프카는 안정성을 유지할 수 있다.

 

위 그림은 'Topic1'을 리플리케이션 팩터 3으로 설정한 후 각 브로커에 배치된 상태이다.

그림에 나타난대로, 'Topic1'은 원본을 포함해 총 3개가 있다. (정확하게는 토픽이 리플리케이션이 되는것이 아니라 파티션이 리플리케이션 된다)

 

▶ 적절한 리플리케이션 팩터 수는 어떻게 정할 수 있을까?

리플리케이션 팩터수가 커지면 안정성은 높아지지만, 브로커의 리소스를 많이 사용하게 된다. (저장하는 양이 많아지니까)

테스트나 개발환경 = 리플레이션 팩터 수 1

운영 환경 (로그성 메시지로 약간의 유실 허용): 리플리케이션 팩터 수 2

운영 환경 (유실 허용 X) 리플리케이션 팩터 수 3

 

※ 실전 카프카 개발부터 운영까지 고승범 작가에 의하면, 리플리케이션 팩터 수 3정도면 충분하게 안정성 보장과, 적절한 디스크 공간을 사용할 수 있다고 한다. 더 늘어날 경우 디스크 공간을 많이 사용할 수 있음을 인지하도록 하자.

 

실제 카프카를 사용하기 전, 개념 정리를 완벽하게 하고 가도록 하자.

반응형
반응형

- 앞서 공부한 MQ중 널리 사용되는 kafka를 설치하고 공부하도록 할 예정이다.

- AWS, 온프레미스 방식을 이용해도 되지만. 간단하게 도커, 도커 컴포즈를 이용하여 설치하도록 할 예정이다.

 

- 주키퍼3, 카프카3 형태의 클러스터 설치 방법을 공유할 예정이다.

주키퍼 = 과반수 방식을 유지해야하므로 홀수 구성 필요(Elasticsearch 클러스터 구성과 유사)

카프카 = 과반수 방식이 아니라 반드시 3대를 만들 필요는 없지만, 리플리케이션 팩터 수를 3을 충족시키기 위해 최소 3대의 클러스터 구조로 구성한다

 

1. 도커 설치 방법

# docker 리포지토리에 접근하기 위한 키 생성 설정
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -

# 패키지 매니저가 docker 설치 시, 설치 위치를 알기 위한 repository 추가
sudo add-apt-repository \ 
 "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" 

# 위에서 추가한 repository를 위해서 업데이트!
sudo apt update

# docker 설치
sudo apt install docker-ce

sudo systemctl status docker

2. 도커 컴포즈 설치 방법

sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
chmod +x /usr/bin/docker-compose

docker-compose -v

 

3. kafka 설치 방법

git clone https://github.com/onlybooks/kafka2

cd kafka2/appendix_C/cluster_zk_kafka/

docker-compose up -d

4. 설치 완료 확인

docker ps 명령어로 kafka1,2,3 zk1,2,3이 동작중인지, status 가 up인지 확인한다.

 

http://{{IP}}:9000/ 로 접속하면 CMAK를 확인할 수 있다.

해당 CMAK에서  아래와 같이 클러스터를 등록하면 된다.

반응형
반응형

1. Message Queue 란?

- 메시지 큐(Message Queue)는 프로세스 또는 프로그램 간에 데이터를 교환할 때 사용하는 통신 방법 중에 하나로, 메시지 지향 미들웨어(Message Oriented Middleware:MOM)를 구현한 시스템을 의미한다. 메시지 지향 미들웨어란 비동기 메시지를 사용하는 응용 프로그램들 사이에서 데이터를 송수신하는 것을 의미한다. 여기서 메시지란 요청, 응답, 오류 메시지 혹은 단순한 정보 등의 작은 데이터가 될 수 있다.

1. 메시지 전송 시 생산자(Producer)로 취급되는 컴포넌트가 메시지를 메시지 큐에 추가한다.

2. 해당 메시지는 소비자(Consumer)로 취급되는 또 다른 컴포넌트가 메시지를 검색하고 이를 사용해 어떤 작업을 수행할 때까지 메시지 큐에 저장된다.

3. 각 메시지는 하나의 소비자에 의해 한 번만 처리될 수 있는데, 이러한 이유로 메시지 큐를 이용하는 방식을 일대일 통신이라고 부른다.

 

Message Queue는 언제쓰나요?

  • 메시지 큐는 소비자(Consumer)가 실제로 메시지를 어느 시점에 가져가서 처리하는 지는 보장하지 않는다.
  • 언젠가는 큐에 넣어둔 메시지가 소비되어 처리될 것이라고 믿는 것이다.
  • 이러한 비동기적 특성 때문에 메시지 큐는 실패하면 치명적인 핵심 작업보다는 어플리케이션의 부가적인 기능에 사용하는 것이 적합하다.

1. 비동기 작업이 필요한 서비스

이메일전송
나는 이미 이메일을 전송했고, 실제 받는 사람이 읽을 때까지 시간은 걸리겠지만, 해당 작업이 완료처리 될 것을 우리는 알고있다.
바로 실시간으로 처리되지 않아도 서비스에 크게 문제 없는 이런 작업에 MQ를 사용할 수 있다.
즉, MQ는 어느 정도의 응답 지연이 허용되며, 어플리케이션의 핵심 기능은 아닌 경우에 사용하는 것이 적합하다.

 

2. 시스템 간 통신

서버 간 데이터를 주고 받거나 작업을 요청할 때, 시스템 장애를 생각해야한다.

MQ를 사용할 경우 이러한 처리를 간편하게 할 수 있다.

\

- P는 C에 직접 요청하는것이 아닌, MQ에 메세지를 전달한다.

- C는 MQ를 구독하고 MQ로부터 데이터를 수신하여 처리한다.

 

- C가 데이터를 수신할 수 없는 상황이더라도, 데이터는 보장된다 (C가 복구된 후 다시 수신 가능)

- C의 데이터처리 TPS에 맞는 속도로 데이터를 처리할 수 있다. (구현이 쉬워짐)

 

3. 서버 부하가 많은 작업 (2번과 같은 구조)

- 이미지 처리, 비디오 인코딩, 빅데이터 등 대용량 데이터 처리와 같은 작업은 메모리, CPU를 많이 사용한다.

이러한 작업은 동시에 처리할 수 있는 양이 한정적이므로 MQ를 사용하여 서버가 처리할 수 있는 양을 가져와 처리할 수 있다.

 

4. 부하분산

- 부하를 분산 하여 처리할 수 있다. 여러 C(Customer)를 배치해, 원하는 메세지(데이터)의 처리가 가능하다.

- 해당 구조는 수평구조이기 때문에 수평적 확장에 유리하다 (C를 늘리는 구조)

message queue 장점

  • 비동기(Asynchronous)
    • 메시지 큐는 생산된 메시지의 저장, 전송에 대해 동기화 처리를 진행하지 않고, 큐에 넣어 두기 때문에 나중에 처리할 수 있다.
    • 여기서, 기존 동기화 방식은 많은 메시지(데이터)가 전송될 경우 병목이 생길 수 있고, 뒤에 들어오는 요청에 대한 응답이 지연될 것이다.
  • 낮은 결합도(Decoupling)
    • 생산자 서비스와 소비자 서비스가 독립적으로 행동하게 됨으로써 서비스 간 결합도가 낮아진다.
  • 확장성(Scalable)
    • 생산자 서비스 혹은 소비자 서비스를 원하는 대로 확장할 수 있기 때문에 확장성이 좋다.
  • 탄력성(Resilience)
    • 소비자 서비스가 다운되더라도 어플리케이션이 중단되는 것은 아니다. 메시지는 메시지 큐에 남아 있다. 소비자 서비스가 다시 시작될 때마다 추가 설정이나 작업을 수행하지 않고도 메시지 처리를 시작할 수 있다.
  • 보장성(Guarantees)
    • 메시지 큐는 큐에 보관되는 모든 메시지가 결국 소비자 서비스에게 전달된다는 일반적인 보장을 제공한다.
반응형
반응형

1. Transaction에서 rollback이 되지않는 경우는?

- checked exception은 트랜잭션에서 롤백되지않는다. (명시적인 예외처리가 필요한 것, try catch해야하는것 들)

- unchecked exception은 롤백대상 (예외처리를 하지 않아도 IDE에서 에러 뱉지 않음, Runtime Exception...)

 

2. checked exception 경우에도 rollback을 하고싶으면?

만약 B에서 에러가 발생했을때 A를 rollback시키고 싶다면? 이럴때 transaction을 이용한다

하지만 B의 exception이 checked exception이라면?

@Transactional
public void test() {
    A();
    B();
}

만약 아래처럼 try catch를 이용하면 에러를 catch에서 처리했기 때문에, rollback이 발생하지 않는다.

@Transactional
public void test() {
    A();
    try {
    	B(); // checked exception을 발생시키는 부분
    } catch {
    
    }
}

 

- @Transcational 어노테이션에 rollbackFor라는 옵션을 이용한다

해당 에러는 throw하고, rollbackFor옵션을 사용하면 checked exception도 확인할 수 있게 된다.

@Transactional(rollbackFor = {Exception.class})
public void test() {
    A();
    B(); // checked exception을 발생시키는 부분에서는 error throw
}

 

- checked 예외를 unchecked 예외로 변경하여 throw한다

public void B() {
    try {
    
    }catch(Exception e) {
    	throw new RuntimeException("예외");
    }
}


@Transactional
public void test() {
    A();
    B(); // unchecked exception으로 바꿈
}

 

3. 아래와 같은 경우에는 롤백이 일어날까 일어나지 않을까?

	// ATest 클래스
    @Transactional
    public void A() {
        try {
            test.B();
        } catch (RuntimeException e) {
            System.out.println("예외 처리");
        }
    }
    
    
    // BTest클래스
    @Transactional
    public void B() {
        //로직
        mapper.save();
        throw new RuntimeException("예외"); // unchecked exception
    }

 

해당 코드는 다음과 같이 동작한다.

1. A클래스의 트랜잭션이 실행된다.

2. B메소드가 실행되면서 1번의 트랜잭션에 참여한다 (기본 propagation 속성이 PROPAGATION_REQUIRED)

3. save를 실행하는 부분의 처리가 끝나고 트랜잭션의 완료처리 (completion)을 진행함

4. checked Exception이 일어나면서 트랜잭션이 완료처리 됨

5. checked Exception때문에 해당 트랜잭션을 롤백 규칙을 적용(기본 규칙적용), 해당 메소드에서 바로 롤백하지않고 rollback mark를 함 (해당 마크는 전역으로 관리함)

6. A의 로직 수행 > 1번에서 생성된 트랜잭션의 완료처리가 진행, 이때 rollback mark를 확인해서 값이 true라면 rollback진행

따라서 이경우에는 롤백되어버린다

 

해당 propagation 옵션을 변경할경우 해당 rollback mark 문제를 해결할 수 있다

 

PROPAGATION_REQUIRES_NEW 사용시 매번 새로운 트랜잭션 생성

- A와 B가 각각 트랜잭션을 생성하고, 매 번 commit하므로 서로의 rollback에 영향을 미치지 않음

하지만 오버헤드 발생할 수 있음 (매번 새로운 커넥션 생성)

반응형
반응형

mybatis Cursor란?

- 공식문서에 다음과 같이 설명하고 있다

A Cursor offers the same results as a List, except it fetches data lazily using an Iterator.

(Cursor는 Iterator를 사용하여 Lazy하게 데이터를 가져오고 이는 List와 동일한 결과를 제공한다)

 

간단 정리 : 대량 데이터(대량 ROW)를 가져올때 사용하는 방법

- 약 1천만건의 데이터를 List에 담아서 어떠한 처리를 한다고 가정했을때 기존 방법은 OOM 발생 > CURSOR사용하면 해결 가능

 

1. cursor를 사용할때 동작방식의 차이

 

- 기존 방식의 동작 방법

1. DAO Mapper 인터페이스 선언을 바탕으로, Mybatis가 동적으로 생성한 코드로 DB작업을 준비

2. DAO Mapper를 통해 DB작업이 진행되면 알맞은 드라이버 (ex: JDBC)나 풀을 통하여 작업을 수행

3. 2.의 작업이 완료될때까지 코드 블로킹 (통상 service 코드 블로킹)

4. DB작업이 끝나면 spring 으로 돌아옴 (서비스로)

- Cursor를 이용했을 때 동작 방법

1. DAO Mapper 인터페이스 선언을 바탕으로, Mybatis가 동적으로 생성한 코드로 DB작업을 준비

2. DAO Mapper를 통해 DB작업이 진행되면 알맞은 드라이버 (ex: JDBC)나 풀을 통하여 작업을 수행

3. 트랜잭션을 시작, cursor의 경우 2의 작업이 cursor로 iteration을 반복할 수 있는 상태가 되면 모든 데이터가 받아지지 않더라도 DAO interface를 통해 cursor를 반환한다. (반환한 cursor를 서비스에서 처리 후 > 다시 반복)
4. DB 커넥션이 유지되는 동안 필요한 작업을 수행. cursor가 데이터셋의 끝에 도달할때까지 반복 가능

5. 트랜잭션 종료 (커넥션 종료)

 

 

3번에 따라 JVM 메모리에 한번에 모든 결과를 올려둘 필요가 없으므로, 충분한 시간만 주어진다면 조회 데이터 수가 많더라도 OOM 없이 데이터를 모두 읽어서 처리할 수 있다.

 

2. 사용 방법

기존 코드를 다음과 같이 개선하여 사용할 수 있다.

 

- 기존 Mapper 코드

List<TestDto> selectTest();

- 개선 Mapper 코드

Cursor<TestDto> selectTest();

 

- 기존 서비스 코드

@Service
@RequiredArgsConstructor
public class testService {
    @Autowired
    private TestMapper testMapper;

    public void test() {
    	List<TestDto> list = testMapper.selectTest();
        ...
        ....
    }
}

 

- 개선 서비스 코드

@Service
@RequiredArgsConstructor
public class testService {
    @Autowired
    private TestMapper testMapper;

	@Transactional
    public void test() {
    	try( Cursor<TestDto> list = testMapper.selectTest()) {
        	for (TestDto dto : list) {
            //....
            //..
            
            }
        } catch (Exception e) {
        	// ..
        }
    }
}

왜 이렇게 개선되어야 할까? 이는 앞서 말한 동작방식을 보면 알 수 있다.

Cursor 는 한줄씩 데이터를 처리할 수 있게 해준다고 앞에서 언급했는데, 이는 즉 데이터 처리가 끝나면 다음 줄을 읽어와야하는것을 의미한다.

따라서 전체 데이터를 모두 순회 할때까지 DB 연결이 유지되어야 한다는 걸 의미한다.

 

정리해보면

1. 해당 서비스 메소드에 @Transactional 을 달아 트랜잭션 상태를 유지시킨다

2. Service 메소드를 벗어나기전 Cursor를 써야하는 작업을 모두 마쳐야한다.

 

3. cursor와 fetchsize의 관계

네트워크 통신보다 메모리에 있는 내용을 처리하는 속도가 훨씬 빠름 따라서 얼만큼 처리할 데이터를 메모리에 올려놓는지 적절히 조율이 필요함

  • 통신 빈도를 줄인다 - 통신 한번에 받아올 데이터의 양이 늘어난다(캐시를 많이 해야하므로 JVM 메모리를 많이 먹는다)
  • 통신 빈도를 늘린다 - 통신 한번에 받아올 데이터의 양이 줄어든다(캐시를 적게 해도 되므로 JVM 메모리를 적게 먹는다)

 

4. 내가 겪었던 상황

 

테이블 A, 테이블 B의 데이터를 JOIN해서 select 해오고 있었음

A에 약 100만건, B에 1000만건의 데이터가 있다고 가정

 

JOIN하는 순간부터 너무 많은 시간이 걸림

반응형
반응형

JWT 사용시 csrf.disable()를 한다.

Cross-Site Request Forgery

이용자가 의도하지 않은 요청을 통한 공격을 의미한다.

즉, CSRF 공격이란, 인터넷 사용자(희생자)가 자신의 의지와는 무관하게 공격자가 의도한 행위(등록, 수정, 삭제 등)를 특정 웹사이트에 요청하도록 만드는 공격이다.

 

- session, cookie를 이용하는 경우, 서버에서 사용자가 올바른지 검증하게 된다. 서버에서 검증을 진행하다보니 의도하지않은 공격이 있는지 확인하는 절차가 필요하지만,

JWT의 경우 토큰값에 정보들이 들어있어 서버에서 별다른 검증이 필요하지않으므로 csrf를 disable해도 무방하다.

 

- spring security에서도 권장함

반응형
반응형

대용량 트래픽 처리 시스템이란?

- 하나의 서버, 데이터베이스로 감당하기 힘든 부하를 처리하는것 > 다수의 서버와 데이터베이스를 마치 하나인것처럼 동작하게한다 (이안에는 여러 마이크로서비스들 포함)

 

대용량 트래픽 처리를 위한 특징 세가지

1. 고가용성

- 언제든 서비스를 이용할 수 있어야함

 

2. 확장성

- 시스템이 비대해짐에 따라 증가하는 데이터와 트래픽에 대응할 수 있어야한다.

 

3. 관측 가능성

- 문제가 생겼을 때 빠르게 인지할 수 있어야하고 문제의 범위를 최소화 할 수 있어야함

 앞서 DB의 병목현상에 대해 알아봤는데, 점진적으로 대용량 트래픽 처리 시스템을 발전시켜보자

1. 기본 구성

2. 사용자의 증가로인해 서버의 응답속도가 느려짐 > 서버의 스케일 아웃

해당 서버의 부하를 분산하는건 로드밸런서(nginx...)의 역할 (RR알고리즘....등)

 

3. 서버를 충분히 늘렸음에도 DB응답속도가 느려짐

> 쿼리튜닝(인덱스 등), 로컬캐시 (각 서버의 메모리에 캐싱), 글로벌캐시 (redis)

글로벌 캐시 예시

캐시를 이용할경우, 주기, 만료정책 등을 고려해야함

 

> DB 다중화 (스케일 아웃) 아키텍처 참고 https://blog.naver.com/takane7/221440417322

 

웹 어플리케이션 시스템 아키텍쳐의 변화

초기 아키텍쳐에서부터 시작해 클러스터링 아키텍쳐로의 확장을 고려하기까지의 스토리를 다뤄보고자 합니...

blog.naver.com

4. 이메일, 알림과 같은 대외기관 서버와의 연동이 많이 필요 해짐, 대외기관의 응답이 느려짐

> 클라이언트는 대외기관의 응답을 기다리느라, 요청을 기다리게 됨

비동기 큐 = kafka rabbitmq...(스레드풀을 이용한 비동큐도 가능)

비동기 큐에 요청을 쌓으므로서, 대외기관과의 트랜잭션을 클라이언트 요청에서 제외 시킬 수 있음 (내 서버에 데이터에만 의존하게 됨)

비동기 큐를 사용하면, 대외기관의 적정 TPS의 맞게 요청량을 조절할 수 있음

반응형

+ Recent posts