본문 바로가기
ElasticSearch

Java Client를 이용한 Elasticsearch 쿼리

by Salt-Fn 2025. 3. 29.

Spring boot 에서 Java Client를 이용하여 Elasticsearch 쿼리를 작성해볼려고 한다. GPT에 물어봤는데도 이상한 답을 줘서 혼자 이것저것 해봤는데 이게 맞는 방법인지 잘 모르겠다.

 

Elasticsearch 클러스터 구성은 다른 포스트에서 작성했으니 이미 클러스터가 있다는 가정하에 작성해 보겠다.

 

우선 의존성을 추가하자.

implementation 'co.elastic.clients:elasticsearch-java:8.14.3'

버전은 사용하고 있는 elasticsearch 버전에 맞춰 변경해야한다.

 

Elasticsearch 8.0 버전 이상 부터는 java clinet의 사용이 권장된다. High level rest client 는 7.15 버전부터 deprecated로 지정되었다. 현재 필자가 설치해서 공부하고 있는 버전은 8.14.3 이므로 java client를 이용하여 쿼리를 작성하였다.

 

우선은 Elasticsearch에 요청을 보내기 위해 설정을 하자. 아래 코드는 엘라스틱서치 바이블 책의 kotlin 코드를 java로 작성한 것이다.

@Configuration
public class ElasticsearchConfig {
    @Value("${spring.elasticsearch.username}")
    private String username;

    @Value("${spring.elasticsearch.password}")
    private String password;

    @Value("${caPath}")
    private String certificationPath;

    @Value("${spring.elasticsearch.uris}")
    private String[] exHost;

    private final int FLUSH_INTERVAL = 1;
    private final int MAX_OPERATION = 10000;

    @Bean
    public RestClient buildClient() throws Exception {
        RestClientBuilder restClientBuilder = RestClient.builder(
                new HttpHost("127.0.0.1", 9203, "https")
        );

        restClientBuilder.setDefaultHeaders(
                new BasicHeader[]{new BasicHeader("my-header", "my-value")}
        );

        restClientBuilder.setRequestConfigCallback(
                requestConfigBuilder -> requestConfigBuilder
                        .setConnectTimeout(5000)
                        .setSocketTimeout(70000)
        );

        String caPath = certificationPath;
        CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
        X509Certificate trustedCa;

        try (FileInputStream fis = new FileInputStream(caPath)) {
            trustedCa = (X509Certificate) certificateFactory.generateCertificate(fis);
        }

        KeyStore trustStore = KeyStore.getInstance("pkcs12");
        trustStore.load(null, null);
        trustStore.setCertificateEntry("ca", trustedCa);

        SSLContext sslContext = SSLContexts.custom()
                .loadTrustMaterial(trustStore, null)
                .build();

        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));

        restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
                .setSSLContext(sslContext)
                .setDefaultCredentialsProvider(credentialsProvider))
        ;

        return restClientBuilder.build();
    }

    @Bean
    RestClientTransport restClientTransport(RestClient restClient, ObjectProvider<RestClientOptions> restClientOptions) {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());

        return new RestClientTransport(restClient, new JacksonJsonpMapper(mapper), restClientOptions.getIfAvailable());
    }

    @Bean
    public ElasticsearchClient elasticsearchClient(RestClientTransport restClientTransport) {
        return new ElasticsearchClient(restClientTransport);
    }

    @Bean
    public ElasticsearchAsyncClient elasticsearchAsyncClient(RestClientTransport restClientTransport) {
        return new ElasticsearchAsyncClient(restClientTransport);
    }

    @Bean
    public BulkIngester<BulkOperation> bulkIngester(ElasticsearchClient client, BulkIngestListenerImpl<BulkOperation> listener) {
        return BulkIngester.of(b -> b
                .client(client)
                .flushInterval(FLUSH_INTERVAL, TimeUnit.SECONDS)
                .maxOperations(MAX_OPERATION)
                .listener(listener));
    }

    @Bean
    public BulkIngestListenerImpl<BulkOperation> bulkIngestListener() {
        return new BulkIngestListenerImpl<>();
    }
}

하나씩 살펴보자.

 

@Value("${spring.elasticsearch.username}")
private String username;

@Value("${spring.elasticsearch.password}")
private String password;

@Value("${caPath}")
private String certificationPath;

@Value("${spring.elasticsearch.uris}")
private String[] exHost;

private final int FLUSH_INTERVAL = 1;
private final int MAX_OPERATION = 10000;

 

@Value는 설정파일에서 값을 주입받는다.

username, password는 elasticsearch 접근을 위한 값이다. certificationPath 변수는 elasticsearch 클러스터를 구성할 때 자동 보안설정을 진행하며 발급받았던 CA 인증서의 경로다. exHost는 elasticsearch 서버의 URI 목록이다.

FLUSH_INTERVAL과 MAX_OPERATION은 elasticsearch에 벌크 연산을 할 때 사용할 값이다. 각각 플러시 주기(초 단위), 한 번에 처리할 최대 연산 개수이다.

 

RestClinet 생성

@Bean
public RestClient buildClient() throws Exception {
    RestClientBuilder restClientBuilder = RestClient.builder(
        new HttpHost("127.0.0.1", 9203, "https")
    );

 

elasticsearch의 ip, port, 프로토콜을 넣어 RestClinet를 생성하고 bean 객체로 등록한다. 현재 클러스터는 1개의 coordinating node, 2개의 data node, 1개의 master node를 사용중이다. 쿼리 요청은 coordinating node에 해야하므로 coordinating의 정보를 넣어준다.

 

restClientBuilder.setDefaultHeaders(
    new BasicHeader[]{new BasicHeader("my-header", "my-value")}
);

 

모든 요청에 들어간 헤더 값을 넣어준다.

restClientBuilder.setRequestConfigCallback(
    requestConfigBuilder -> requestConfigBuilder
        .setConnectTimeout(5000)
        .setSocketTimeout(70000)
);
  • setConnectTimeout: 연결 시도 시간
  • setSocketTimeout: 데이터 수신 대기 시간 설정
String caPath = certificationPath;
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
X509Certificate trustedCa;

try (FileInputStream fis = new FileInputStream(caPath)) {
    trustedCa = (X509Certificate) certificateFactory.generateCertificate(fis);
}

 

CA 인증서를 로드

  • CertificateFactory: X.509 형식의 인증서를 로드
  • trustedCa: 신뢰할 수 있는 인증서로 등록
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);

SSLContext sslContext = SSLContexts.custom()
    .loadTrustMaterial(trustStore, null)
    .build();

 

KeyStore 설정

  • KeyStore: 인증서 저장소로 PKCS12 형식을 사용
  • trustStore.setCertificateEntry(...): 인증서를 KeyStore에 추가
  • SSLContext: HTTPS 연결을 위한 SSL 설정을 구성
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, 
    new UsernamePasswordCredentials(username, password));

restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
    .setSSLContext(sslContext)
    .setDefaultCredentialsProvider(credentialsProvider));

return restClientBuilder.build();

 

인증 설정

  • BasicCredentialsProvider: Elasticsearch의 Basic Auth에 필요한 자격 증명 설정
  • setSSLContext: HTTPS 연결에 사용할 SSL 설정 추가
  • setDefaultCredentialsProvider: 사용자 인증 정보 추가

이제 Elasticsearch 클라이언트 생성을 하였다.

@Bean
RestClientTransport restClientTransport(RestClient restClient, ObjectProvider<RestClientOptions> restClientOptions) {
    ObjectMapper mapper = new ObjectMapper();
    mapper.registerModule(new JavaTimeModule());

    return new RestClientTransport(restClient, new JacksonJsonpMapper(mapper), restClientOptions.getIfAvailable());
}

 

RestClientTransport 생성

  • JacksonJsonpMapper: JSON 직렬화를 위한 ObjectMapper 설정
  • JavaTimeModule: Java 8 날짜/시간 (LocalDateTime 등) 처리를 위한 모듈

ObjectMapper는 Jackson 라이브러리의 클래스로 Java 객체 <-> Json 데이터간 직렬화와 역직렬화를 수행한다.

Java Client는 기본적으로 JacksonJsonpMapper를 통해 데이터를 처리하며, ObjectMapper가 내부에서 사용된다. java.time 패키지의 날짜/시간 타입은 기본적으로 Jackson에서 직렬화와 역직렬화가 지원되지 않아 JavaTimeModule을 등록해야 Json으로 올바르게 변환된다.

 

위 RestClinet 작성은 링크를 통해서도 확인할 수 있다.

@Bean
public ElasticsearchClient elasticsearchClient(RestClientTransport restClientTransport) {
    return new ElasticsearchClient(restClientTransport);
}

@Bean
public ElasticsearchAsyncClient elasticsearchAsyncClient(RestClientTransport restClientTransport) {
    return new ElasticsearchAsyncClient(restClientTransport);
}

 

Elasticsearch Client 생성

  • ElasticsearchClient: 동기 클라이언트 (일반적인 CRUD, 검색 용도).
  • ElasticsearchAsyncClient: 비동기 클라이언트 (비동기 처리 필요 시 사용).
@Bean
public BulkIngester<BulkOperation> bulkIngester(ElasticsearchClient client, BulkIngestListenerImpl<BulkOperation> listener) {
    return BulkIngester.of(b -> b
        .client(client)
        .flushInterval(FLUSH_INTERVAL, TimeUnit.SECONDS)
        .maxOperations(MAX_OPERATION)
        .listener(listener));
}

 

BulkIngester 설정

  • 대규모 데이터 삽입 시 성능 최적화를 위한 설정
  • flushInterval: 데이터가 주기적으로 Elasticsearch에 저장
  • maxOperations: 한 번에 처리할 최대 작업 수 제한
@Bean
public BulkIngestListenerImpl<BulkOperation> bulkIngestListener() {
    return new BulkIngestListenerImpl<>();
}

 

Bulk 처리 이벤트 리스너 설정

  • Bulk 처리가 완료되거나 오류 발생 시 알림을 처리

이제 설정은 끝났으니 Java clinet를 이용한 쿼리를 작성해보자. 프로젝트를 진행하며 사용한 쿼리만 작성할 예정이지만 kibana에서 작성하는 json 형태의 쿼리와 유사하니 다른 쿼리 작성도 할 수 있을거라 생각한다.

 

검색

@Override
    public List<SearchedLectureResponse> search(String q, String f) throws IOException {
        SearchRequest searchRequest = SearchRequest.of(s -> s
                .index("lecture")
                .query(query -> query
                        .bool(bool -> bool
                                .must(m -> m
                                        .match(match -> match
                                                .field(f)
                                                .query(q)))
                                .filter(m -> m
                                        .range(r -> r.field("date")
                                                .from(LocalDate.now().toString())
                                                .to(LocalDate.now().toString())))))
                .routing(q));


        SearchResponse<LectureDocument> lectures = client.search(searchRequest, LectureDocument.class);

        return lectures.hits().hits().stream()
                .map(Hit::source).filter(Objects::nonNull)
                .map(SearchedLectureResponse::toDto)
                .collect(Collectors.toList());
    }

 

간단한 검색 쿼리다.

 

index에 검색 대상 index의 이름을 넣는다. 기본적으로 java clinet의 쿼리는 람다를 이용한다. 위 쿼리는 lecture 인덱스에서 검색하고 bool 복합 쿼리를 이용한다. must를 이용해 필수 조건을 넣을 수 있다. f 필드 값에서 q 값과 일치하는 문서를 찾는다. 추가적으로 range를 이용해 날짜 값을 추가하였다. 위 쿼리는 오늘 날짜인 문서만 검색한다. 날짜 범위 검색은 점수 계산이 필요 없으므로 filter를 이용했다. 마지막으로 routing으로 특정 샤드로 요청을 라우팅한다. 위 검색 쿼리는 id를 이용한 검색이므로 id를 키로 라우팅을 하게 되는 것이다. 라우팅에 대한 정보는 포스팅을 살펴보자.

 

return의 stream은 목표하는 데이터 source를 찾고 원하는 형태의 데이터로 매핑하여 list로 만들고 있다.

 

검색 with searchAfter

이번엔 searchAfter를 이용한 검색이다. 간단하게 구현하기 위해 searchAfter를 활용하기 위한 정보는 session에 저장하고 꺼내고 있다.

public List<SearchedLectureResponse> lectureList(HttpSession session) throws IOException {
        SearchRequest searchRequest;
        List<FieldValue> searchAfter = getSearchAfterFromSession(session);

        if (searchAfter == null) {
            searchRequest = searchRequestInit();
        } else {
            searchRequest = searchRequestWithSearchAfter(searchAfter);
        }

        SearchResponse<LectureDocument> lectures = client.search(searchRequest, LectureDocument.class);

        List<FieldValue> newSearchAfter = getLastHitSortValues(lectures);
        session.setAttribute("searchAfter", newSearchAfter);

        return lectures.hits().hits().stream()
                .map(Hit::source).filter(Objects::nonNull)
                .map(SearchedLectureResponse::toDto)
                .collect(Collectors.toList());
}
private List<FieldValue> getSearchAfterFromSession(HttpSession session) {
        Object sessionAttr = session.getAttribute("searchAfter");

        if (sessionAttr instanceof List<?>) {
            return (List<FieldValue>) sessionAttr;
        }

        return null;
    }
private SearchRequest searchRequestInit() {
        String today = LocalDate.now().format(DateTimeFormatter.ISO_DATE);

        return SearchRequest.of(s -> s
                .index("lecture")
                .size(20)
                .query(q -> q
                        .bool(b -> b
                                .filter(f -> f
                                        .term(t -> t
                                                .field("date")
                                                .value(today)))))
                .sort(so -> so
                        .field(f -> f.field("ordinaryPrice").order(SortOrder.Asc))
                ).sort(so -> so
                        .field(f -> f.field("lectureId").order(SortOrder.Asc)))
        );
    }
private SearchRequest searchRequestWithSearchAfter(List<FieldValue> searchAfter) {
        String today = LocalDate.now().format(DateTimeFormatter.ISO_DATE);

        return SearchRequest.of(s -> s
                .index("lecture")
                .size(20)
                .query(q -> q
                        .bool(b -> b
                                .filter(f -> f
                                        .term(t -> t
                                                .field("date")
                                                .value(today)))))
                .sort(so -> so
                        .field(f -> f.field("ordinaryPrice").order(SortOrder.Asc))
                ).sort(so -> so
                        .field(f -> f.field("lectureId").order(SortOrder.Asc)))
                .searchAfter(searchAfter)
        );
    }
private List<FieldValue> getLastHitSortValues(SearchResponse<LectureDocument> searchResponse) {
        List<Hit<LectureDocument>> hits = searchResponse.hits().hits();
        if (hits.isEmpty()) {
            return null;
        }
        return hits.get(hits.size() - 1).sort();
    }

각 메서드의 역할을 보자.

 

 

  • lectureList: elasticsearch에서 강의 리스트를 가져와 반환한다.
  • getSearchAfterFromSession: session에서 "searchAfter"을 키로 값을 꺼내고 있다. 만약 해당 키의 값이 없다면 null을 반환한다.
  • searchRequestInit: session에 "searchAfter"에 해당하는 값이 없다면 searchAfter 없이 검색 쿼리를 작성하였다.
  • searchRequestWithSearchAfter: 쿼리에 searchAfter 를 추가하여 검색한다.
  • getLastHitSortValues: 검색한 문서 리스트의 마지막 항목의 값을 구한다. 아래에서 설명하겠지만 쿼리에서 두개의 sort를 볼 수 있는데 하나는 ordinaryPrice, 다른 하나는 lectureId를 이용한 오름차순 정렬을 하고 있다. searchAfter는 이전 검색 결과에서 마지막 정렬된 항목을 기준으로 다음 페이지를 가져오게 된다. 이 방식에서는 중복항목을 방지하기 위해 고유한 키 값이 필요하다. 여기서 고유한 키 값은 lectureId가 된다. 최종적으로 List<FieldValue>에는 ordinaryPrice, lectureId를 이용한 정렬에서 마지막 항목을 가지게 된다.

이제 쿼리를 보자

String today = LocalDate.now().format(DateTimeFormatter.ISO_DATE);

SearchRequest.of(s -> s
                .index("lecture")
                .size(20)
                .query(q -> q
                        .bool(b -> b
                                .filter(f -> f
                                        .term(t -> t
                                                .field("date")
                                                .value(today)))))
                .sort(so -> so
                        .field(f -> f.field("ordinaryPrice").order(SortOrder.Asc)))
                        .sort(so -> so
                        .field(f -> f.field("lectureId").order(SortOrder.Asc)))
        );

lecture index에서 오늘 날짜의 문서를 20개 가져온다. ordinaryPrice로 오름차순, lectureId로 오름차순을 하고 있다. 위에서도 말했지만 searchAfter를 이용하기 위해선 중복방지를 위한 고유한 값이 필요하다. 여기서는 lectureId를 이용하고 있다.

String today = LocalDate.now().format(DateTimeFormatter.ISO_DATE);

SearchRequest.of(s -> s
                .index("lecture")
                .size(20)
                .query(q -> q
                        .bool(b -> b
                                .filter(f -> f
                                        .term(t -> t
                                                .field("date")
                                                .value(today)))))
                .sort(so -> so
                        .field(f -> f.field("ordinaryPrice").order(SortOrder.Asc)))
                        .sort(so -> so
                        .field(f -> f.field("lectureId").order(SortOrder.Asc)))
                .searchAfter(searchAfter)
        );

방금 쿼리와 유사하지만 마지막에 searchAfter가 추가되었다. 해당 값을 기준으로 다음 20개를 가져오게 된다.

 

집계

이번에는 집계 쿼리다.

public List<LectureYearPriceResponse> lecturePriceYearAgg(String lectureId) throws IOException {
        SearchRequest searchRequest = SearchRequest.of(s -> s
                .index("lecture")
                .size(0)
                .query(q -> q
                        .bool(b -> b
                                .must(m -> m
                                        .term(t -> t
                                                .field("lectureId")
                                                .value(lectureId)))
                                .filter(f -> f
                                        .range(r -> r
                                                .field("date")
                                                .from("now-1y/d")
                                                .to("now")))))
                .aggregations("date-range-aggs", a -> a
                        .dateHistogram(dh -> dh
                                .field("date")
                                .calendarInterval(CalendarInterval.Month))
                        .aggregations("min_price", aa -> aa
                                .min(m -> m.field("salePrice")))));

        List<LectureYearPriceResponse> list = new ArrayList<>();
        SearchResponse<LectureDocument> response = client.search(searchRequest, LectureDocument.class);

        for (DateHistogramBucket s : response.aggregations().get("date-range-aggs").dateHistogram().buckets().array()) {
            list.add(new LectureYearPriceResponse(LocalDate.parse(s.keyAsString()),
                    (int) s.aggregations().get("min_price").min().value()));
        }

        return list;
    }

lectureId를 대상으로 집계하는 쿼리다. 해당 lectureId의 1년치 문서 중 각 월의 "salePrice" 중 최소값을 집계하고 있다.

일/월 1월 2월 3월 4월 5월 6월 7월 8월 9월 10월 11월 12월
1일 10 10 10 10 9 9 9 9 9 10 1 5
2일 11 11 11 11 10 10 10 10 10 9 2 6

위와 같은 표를 예로 들면 1월의 최소값인 10, 2월의 최소값 10.....5월의 최소값 9, 6월의 최소값 9.... 10월의 최소값 9... 11월은 1, 12월은 5 와 같은 값을 집계하게 된다.

 

size의 값이 0인 이유는 집계한 값의 결과만 필요하고 검색된 문서는 필요없으므로 0으로 했다. 집계는 aggregations에 작성하게 된다.

첫번째 값으로 집계되어 반환된 SearchResponse에서 원하는 집계 값을 찾기위한 key를 넘겨준다. 여기서는 "date-range-aggs"다.

다음으로 dateHistogram을 이용해 lectureId에 해당하는 월별 문서를 가져오고 해당 문서에서 salePrice가 가장 작은 문서를 반환하게 된다.

 

이상으로 java client를 이용해 elasticsearch에 쿼리를 보내는 방법을 알아봤다. 검색과 집계 쿼리를 간단하게 알아봤는데 kibana에서 작성하는 쿼리와 유사한 형태여서 다른 쿼리를 작성하는것도 어렵지 않을거라 생각한다.

 

다음 포스팅은 filebeat를 이용한 로그 수집이 될 것 같다. 이상으로 포스팅을 마치겠다.

 

* 위 쿼리들은 프로젝트를 진행하며 작성한 쿼리들입니다. 참고자료가 많지않아 올바르게 작성된 쿼리인지는 잘 모르겠습니다... 물론 전부 실행은 해본 쿼리고 원하는대로 작동은 하였지만 만약 잘못된 곳이 있다면 댓글로 알려주시면 수정하겠습니다.