LostCatBox

SpringProject-TrackingPost-CH03

Word count: 2.2kReading time: 13 min
2022/12/24 Share

Project 통합택배조회 api 03편 (두번째 구현)

Created Time: September 8, 2022 10:24 PM
Last Edited Time: December 12, 2022 8:51 PM
References: https://velog.io/@soyeon207/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B6%80%ED%8A%B8-%EB%A9%80%ED%8B%B0-%EB%AA%A8%EB%93%88-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EB%A7%8C%EB%93%A4%EA%B8%B0
Tags: Java, Spring, Computer

왜?

msa로 분리하고 kafka 적용하기

현재는 모든 서비스가 한 서버내에서 모두 돌아가고있다.

고려한 아키택처

첫번째

1

해당 구조는 분명 scale out시에 서비스 각자의 DB를 가지므로 이 DB에 대해서 동기화라는 부분이 중요하였다. 이를 kafka connect로 해결할수있었지만, 지금 내가 구현하고싶은 서비스가 이정도로 확장성을 고려해야할지 의문이라 선택하지 않았다.

두번째

2

kafka를 메시지 q로 사용하여, 외부 API조회용과 값을반영하여 DB에 저장하기위해서 DBservice를 통해 DB에 중복체크 및 반영되도록 설계하였다. DBservice가 DB만을 관리하는 서비스로 하고싶었지만, DB를 조회하는 입장에서는 kafka를 통해서 할수가없었으므로 (조회후 응답받아야함.) DB랑 직접 연결하였다. 그럼 DBservice가 그렇게 큰 역할을 하고있지않았다. 또한 외부 택배 API조회가 동기 처리가되어있으므로, 카카오 채널에 대해 응답이 바로 나갈수없었고 기다려야했으므로 선택하지않았다.

세번째(선택)

3

(선택한 아키텍처) 카카오 채널 요청을 바로 응답해줄수있을뿐만 아니라, 외부 택배 조회 api로 나눠 해당 서비스 필요시 내부 kafka요청을 통해 동작요청이 가능하게하였고, 하나의 DB에 외부택배조회서비스와 조회기능서비스가 같은 DB를 바라보고있으므로, 각 서비스별로 DB에 대해서 동기화해줄필요도없었다. → 이는 DB가 하나이므로 장점이될수있다.

다만, 추후 서비스가 DB가 하나이므로 트랙잭션 처리가 매우 중요해질수있다…그때는 각 서비스별로 DB를 쪼개고 동기화하는 방식인 첫번째 안을 고려할것이다.

구현

멀티 모듈

공통의 관심사를 가진 것을 하나로 묶어놓는 것

common(공통적인) 코드들을 묶어서 놓을수있고, DTO등 많은 부분들을 공유가능하여, 변경사항이 생겻을때 따로 각자 변경할필요없는 장점을 가진다

root와 module 나누기(인텔리제이기준)

  • root에 해당하는 곳에서 new로 module 추가
  • root에 settings.gradle에서 해당
    include(”새로운프로젝트명”)입력
1
2
3
4
rootProject.name = 'trackingpost'
include(
"servicehome" //새로운 프로젝트명
)
  • root에 build.gradle 다음과같이 변경
    • allprojects는 root포함 모든 설정 공유
    • subprojects는 하위 모듈들이 영향받는것
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
plugins {
id 'org.springframework.boot' version '2.7.3'
id 'io.spring.dependency-management' version '1.0.13.RELEASE'
id 'java'
}
//java version
sourceCompatibility = '11'

allprojects{
group = 'com.lostcatbox'
version = '0.0.1-SNAPSHOT'

repositories {
mavenCentral()
}
}

configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
subprojects {
apply plugin: 'java'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.thymeleaf.extras:thymeleaf-extras-springsecurity5'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.security:spring-security-test'
//jpa 설치
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
//mysql 설치
runtimeOnly 'mysql:mysql-connector-java'

//webclient 설치
implementation 'org.springframework.boot:spring-boot-starter-webflux'
//jsoup 설치
implementation 'org.jsoup:jsoup:1.15.3'

}

tasks.named('test') {
useJUnitPlatform()
}
}
  • 모듈들은 따로 build.gradle을 가짐, 이때 ,dependencies 등등 설정은 root의 build.gradle설정 + 해당 모듈의 build.gradle로 컴파일된다.
  • 참고로 externalpost 모듈에 공통되는 core에 대한 클래스들을 사용한다면, 다음과 같이 root에 build.gradle파일에 추가하자
1
2
3
4
project(":externalpost"){
dependencies {
implementation project(":servicehome")
}
  • 아래분은 예시이다

    • core를 가져가셨고, 각 모듈에 build.gradle로 구성하기보다는 root에 build.gradle로 모든 구성을 하셨다.
    • compile 에러시 implementation으로 수정하기
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    buildscript {
    ext {
    springBootVersion = '2.6.2'
    }
    repositories {
    mavenCentral()
    }
    dependencies {
    classpath "org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}"
    classpath "io.spring.gradle:dependency-management-plugin:1.0.11.RELEASE"
    }
    }

    // 하위 모든 프로젝트 공통 세팅
    subprojects {
    apply plugin: 'java'
    apply plugin: 'idea'
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'

    group 'org.example'
    version '1.0-SNAPSHOT'

    sourceCompatibility = '1.8'
    targetCompatibility = '1.8'
    compileJava.options.encoding = 'UTF-8'

    repositories {
    mavenCentral()
    }

    // 하위 모듈에서 공통으로 사용하는 세팅 추가
    dependencies {
    compileOnly 'org.projectlombok:lombok'

    annotationProcessor 'org.projectlombok:lombok'
    annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
    }

    test {
    useJUnitPlatform()
    }
    }

    project(':velog-core') {
    // 공통 코드

    bootJar { enabled = false } // core 은 bootJar 로 패키징 할 필요 없음
    jar { enabled = true }

    dependencies {
    }
    }

    project(':velog-api-server') {
    bootJar { enabled = true }
    jar { enabled = false }

    dependencies {
    compile project(':velog-core') // 컴파일 시 velog-core project 로딩
    implementation 'org.springframework.boot:spring-boot-starter-web'
    }
    }

    project(':velog-batch-server') {
    bootJar { enabled = true }
    jar { enabled = false }

    dependencies {
    compile project(':velog-core') // 컴파일 시 velog-core project 로딩
    }
    }

오류 대응

문제점

  • JPA Repository가 정상적으로 DI되지 않는 현상
1
2
3
4
5
6
7
Description:

Parameter 0 of constructor in com.my.service.MyService required a bean of type 'com.my.jpa.repository.MyRepository' that could not be found.

Action:

Consider defining a bean of type 'com.my.jpa.repository.MyRepository' in your configuration.

원인

JPA 설정이 필요했다. 모듈로 오면서 trackingpost-core에 entity와 repository가 있었고, servicehome에서는 해당 repository를 DI 받을뿐이였다.

현재 servicehome —> trackingpost-core 으로 모듈로 core에 의존적인 형태였다.

이런 형태더라도, servicehome에서 main에 에너테이션으로 반드시 JPA에게 entity와 repository scan을 입력해주어야한다. 두가지를 명시해주자

  • @EnableJpaRepositories
  • @EntityScan
1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@EnableJpaAuditing
@EnableJpaRepositories(basePackages = {"com.example.trackingpostcore"}) //com.example.trackingpostcore 하위에 있는 jpaRepository를 상속한 repository scan
@EntityScan(basePackages = {"com.example.trackingpostcore"}) //com.example.trackingpostcore 하위에 있는 @Entity 클래스 scan
public class ServicehomeApplication {

public static void main(String[] args) {
SpringApplication.run(ServicehomeApplication.class, args);
}

}

기능별로 나누기

구성

Untitled

  • kakaochannel: kakaochannel에서 요청시 요청을 받고, 정형화한 데이터로 바로 링크를 return해주고, kafka로 externalpost로 택배조회 메세지 publish함
  • externalpost: 외부택배 조회를 위해 post하는 역할만 수행(kafka에서 consumer역할)
  • servicehome: 서비스 링크로 접속시, 조회처리 및 새로고침을 담당함(새로고침시 kafka에 externalpost로 택배조회 메세지 publish함)

externalpost

kafka에서 consumer로 동작하며, 메세지는 모두 post에 대해 새로운 update를 원하여 들어옴, RequestInfo객체로 들어옴

application.properties

1
2
3
4
5
6
# kafka property작성
bootstrap.servers=localhost:9092 //broker 서버
retries=0 //
batch.size=4096 //배치 크기
linger.ms=1
buffer.memory=40960 //버퍼 메모리

KafkaConsumerConfig

목적 : ConcurrentKafkaListenerContainerFactory클래스를 생성하고 ConsumerFactory인터페이스를 내부 멤버변수에 Set하고 Bean 으로 등록하기 위한 class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//목적 : ConcurrentKafkaListenerContainerFactory클래스를  생성하고 ConsumerFactory인터페이스를 내부 멤버변수에 Set하고 Bean 으로 등록하기 위한 class
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private Environment env;

@Autowired
KafkaConsumerConfig(Environment env) {
this.env = env;
}

// DefaultKafkaConsumerFactory 생성자로 설정정보를 만들어서 넣어준다.
@Bean
public ConsumerFactory<String, RequestInfo> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,env.getProperty("bootstrap.servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //consumer 그룹지정
return new DefaultKafkaConsumerFactory<>(
props,new StringDeserializer(), new JsonDeserializer<>(RequestInfo.class)); //역직렬화
}

//consumerFactory()설정정보로 kafkaListenerContainerFactory에 등록한다
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RequestInfo> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, RequestInfo> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

Controller

KafkaListener로 함수등록!! 3가지 인자 지정해주기

  • topics
  • groupId
  • containerFactory

참고로 아래는 groupId가 다른 두개의 listener가있으므로 같은 토픽 메세지가 와도 동작을 따로한다!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Controller
public class PostController {
@Autowired
PostManager postManager;
@Autowired
PostDbService postDbService;

private static final String topicName = "posttopic";

//kafka로 해당 외부 택배조회 요청을 consumer 역할하는 것 추가
//containerFactory 지정해주기

@KafkaListener(topics = topicName, groupId="test", containerFactory = "kafkaListenerContainerFactory") //containerFactory중요
public void kafkaconsumer(RequestInfo requestInfo){
PostDto postDto = postManager.getpost(requestInfo); //해당 요청에 대해 post 정보 추출
if (!postDto.getPostNumber().isEmpty()){ //응답 문제시 빈 PostDto객체이므로 not_found 리턴

postDbService.savePost(postDto);
}
}
@KafkaListener(topics = topicName,groupId="logger", containerFactory = "kafkaListenerContainerFactory")
public void kafkaloger(RequestInfo requestInfo){
System.out.println(LocalDateTime.now());
System.out.println(requestInfo.toString());
}
}

kakaochannel

application.properties

1
2
3
4
5
6
# kafka property작성
bootstrap.servers=localhost:9092 //broker 서버
retries=0 //
batch.size=4096 //배치 크기
linger.ms=1
buffer.memory=40960 //버퍼 메모리

KafkaConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//kafkaTemplate 클래스를 Bean 으로 등록
@Configuration
public class KafkaConfiguration {
private Environment env;

@Autowired
KafkaConfiguration(Environment env) {
this.env = env;
}

//설정 정보를 Map<String,Object> 형식으로 작성해서 return 하는 일
@Bean
public Map<String,Object> producerConfig() {

Map<String,Object> props = new HashMap<>();

//server host 지정
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
,env.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

// retries 횟수
props.put(ProducerConfig.RETRIES_CONFIG
,env.getProperty(ProducerConfig.RETRIES_CONFIG));

//batcch size 지정
props.put(ProducerConfig.BATCH_SIZE_CONFIG
,env.getProperty(ProducerConfig.BATCH_SIZE_CONFIG));

// linger.ms
props.put(ProducerConfig.LINGER_MS_CONFIG
,env.getProperty(ProducerConfig.LINGER_MS_CONFIG));

//buufer memory size 지정
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG
,env.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG));

//key serialize 지정
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
, StringSerializer.class);

//value serialize 지정
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
,JsonSerializer.class);

return props;
}

// ProducerFactory를 return하는데 이때 위에서 만들어 놓은 producerConfig()의 설정 정보를 이용해서 ProducerFactory생성자를 호출
@Bean
public ProducerFactory<String, RequestInfo> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}

//최종적으로 우리가 Controller에서 사용 할 KafkaTemplate을 IOC 컨테이너에 등록하는 일을 하게되고 이때 위에서 만들어 놓은 producerFactory() 를 호출해서 KafkaTemplate생성자 Parameter로 넘겨준다.
@Bean
public KafkaTemplate<String, RequestInfo> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}

Controller

kafkaTemplate.send()를 활용하고, 반드시, topic과 data가 필요하다.(필요시 key도 넣을수있다)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
public class KakaoController {
@Autowired
ValidRequest validRequest;
@Autowired
private KafkaTemplate<String, RequestInfo> kafkaTemplate;

private static final String topicName = "posttopic";

@GetMapping(value = "/")
public String gethomepage(){
RequestInfo requestInfo = validRequest.getinfo(); // 요청에 대한 정보 추출

//kafka로 해당 요청 데이터 전송 to 외부 택배조회
kafkaTemplate.send(topicName, requestInfo);

return "localhost:8080"+"/"+requestInfo.getRequestUser()+"/"+requestInfo.getPostNumber()+"/";
}
}

servicehome

생략

kakaochannel을 참고로 똑같이 send가 필요한 부분에 적용시켜주면된다.

docker-compose적용하기

Untitled

  • docker-compse
  • 내부망으로 trackingpost_net을 사용하였고, 각자 container이름으로 접속하게함
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
version: "3"
services:
postdb:
container_name: postdb
image: mysql:8.0.29
environment:
MYSQL_DATABASE: tracking_post_db
MYSQL_USER: user
MYSQL_PASSWORD: Test2802!
MYSQL_ROOT_PASSWORD: Test2802!
ports:
- "3306:3306"
networks:
- trackingpost_net # 해당 네트워크로 편입
restart: always
kakaochannel:
container_name: kakaochannel
build: ./kakaochannel
ports:
- "8082:8082"
# working_dir: /app #컨테이너에서 명령어가 실행될 위치
command: ["java","-jar","/app.jar"] #jar 파일 실행
depends_on:
- postdb
networks:
- trackingpost_net # 해당 네트워크로 편입
restart: always

externalpost:
container_name: externalpost
build: ./externalpost
expose:
- "8081"
command: ["java","-jar","/app.jar"] #jar 파일 실행
depends_on:
- postdb
networks:
- trackingpost_net # 해당 네트워크로 편입
restart: always

servicehome:
container_name: servicehome
build: ./servicehome
ports:
- "8080:8080"
command: ["java","-jar","/app.jar"] #jar 파일 실행
depends_on:
- postdb
networks:
- trackingpost_net # 해당 네트워크로 편입
restart: always

zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
expose:
- "2181"
networks:
- trackingpost_net # 해당 네트워크로 편입
kafka:
image: wurstmeister/kafka
container_name: kafka
expose:
- "9092"
environment: #환경변수
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "posttopic"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 #{public ip혹은 hostname}(consumer나 producer에서 접속할 ip혹은 도메인):9092 kafka 브로커를 가리키는 사용 가능 주소로 초기연결시에 클라이언트에 전달되는 메타 데이터
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 리스너 들의 목록이고, 호스트/ip 로 구성한다. 해당 옵션을 사용하지 않으면 모든 인터페이스에서 수신 할 수 있다. 기본값. 0.0.0.0
depends_on:
- zookeeper
networks:
- trackingpost_net # 해당 네트워크로 편입

networks: #네트워크 새로 등록
trackingpost_net:
driver: bridge

발생한 오류들

graldew bootJar Fail

docker를 사용하여 서버를 올릴때는 jar파일로 올리기위해 jar로 패키징할려고하니까

문제가 graldew bootJar가 자꾸 fail이 떳다

원인

멀티모듈 환경에서 build를 시도했고, 모듈들의 각 build.gradle에 rootproject가 각자써져있었고, dependencies={} 도 입력되어있었다.

해결

각 build.gradle을 지워줬더니, 이제 root에있는 build.gradle만이 참조되어 build되므로 정상적으로 동작함.

아래와 같이 각 모듈들에 대해 실행해줬고, 그래서 jar가 정상적으로 생성됨(build/libs에 jar생성확인)

./gradlew bootJar -p ./servicehome

현재 전체 환경을 한번에 보기위해 root에다가 build.gradle만 관리하고있었고, 안에 필요한 모든것을 구성해놓음(개별 build.gradle필요없다)

core utils는 서버로 돌릴일없으니 jar =true, bootjar =false로 해놓는게 필수다.

Broker may not be available

1
2
3
externalpost    | 2022-09-14 11:46:28.677  WARN 1 --- [ntainer#0-0-C-1] 
org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-1, groupId=test]
Connection to node 1004 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

advertised.listeners=PLAINTEXT://0.0.0.0:9092

  • 0.0.0.0:9092는 server의 외부IP와 kafka의 포트번호입니다. 보안을 위하여 0.0.0.0으로 표기하였습니다.
  • advertised.listeners는 client가 server의 kafka(broker)에 접근하여 메시지를 발행하거나 소비할때 0.0.0.0:9092로 메시지에 접근하라는 일종을 알림입니다.

원인

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
{public ip혹은 hostname}(consumer나 producer에서 접속할 ip혹은 도메인):9092

kafka 브로커를 가리키는 사용 가능 주소로 초기연결시에 클라이언트에 전달되는 메타 데이터

만약 advertised.listeners가 주석처리 되어있거나 PLAINTEXT://localhost:9092로되어있다면kafka는 client에게 localhost:9092에서 메시지를 발행하거나 소비한다.

client와 server가 같다면 상관없지만 현재 나같은 경우 외부 client이기에 localhost:9092로 접속하여도 메시지를 발행, 소비할 수 없다.

해결

그렇기 때문에 꼭 server의 외부IP를 지정해야합니다.

즉 전달되는 주소가 localhost였으니문제

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

kafka 컨테이너의 외부주소로 바꾸니 해결되었다ㅠㅠ

6

참조

https://okky.kr/articles/762929?note=2062525

내 작업폴더와 실행폴더 위치 항상확인하자

또..따로놀고있었다.

그니까 반영이안되지

CATALOG
  1. 1. Project 통합택배조회 api 03편 (두번째 구현)
  2. 2. 왜?
  3. 3. 고려한 아키택처
    1. 3.1. 첫번째
    2. 3.2. 두번째
    3. 3.3. 세번째(선택)
  4. 4. 구현
    1. 4.1. 멀티 모듈
    2. 4.2. root와 module 나누기(인텔리제이기준)
    3. 4.3. 오류 대응
      1. 4.3.1. 문제점
      2. 4.3.2. 원인
    4. 4.4. 기능별로 나누기
      1. 4.4.1. 구성
      2. 4.4.2. externalpost
      3. 4.4.3. kakaochannel
      4. 4.4.4. servicehome
    5. 4.5. docker-compose적용하기
  5. 5. 발생한 오류들
    1. 5.1. graldew bootJar Fail
      1. 5.1.1. 원인
      2. 5.1.2. 해결
    2. 5.2. Broker may not be available
      1. 5.2.1. 원인
      2. 5.2.2. 해결
      3. 5.2.3. 참조
  6. 6. 내 작업폴더와 실행폴더 위치 항상확인하자