본문 바로가기
ElasticSearch

엘라스틱서치 데이터 다루기 part 2 복수 문서 API

by Salt-Fn 2024. 7. 11.

2. 복수 문서 API

이번 포스트에서는 여러 문서를 한 번에 색인, 업데이트, 삭제할 때 사용하는 bulk API와 여러 문서를 한 번에 조회할 때 사용하는 multi get API에 대해 작성한다. 그리고 특정 문서를 대상으로 작업하는 update by query API와 delete by query API에 대해 작성한다.

2.1 bulk API

bulk API는 여러 색인, 업데이트, 삭제 작업을 한 번의 요청에 담아서 보내는 API다. bulk API는 엘라스틱서치의 다른 API와는 다르게 JSON이 아닌 NDJSON 형태로 만든다. NDJSON은 여러 줄의 JSON을 줄바꿈 문자로 구분하여 요청을 보낸다. Content-Type 헤더도 application/json 대신 application/x-ndjson을 사용해야 한다. 또한 가장 마자막 줄도 줄바꿈 문자 \n으로 끝나야 한다.

위 요청은 5개의 세부 요청으로 이루어져 있다.

 

첫번째 줄은 요청의 종류와 인덱스, _id, 라우팅 등을 지정한다. 해당 요청이 추가적인 요청 본문을 필요로 하는 경우 다음 줄에 이어서 기재한다. 예를 들어 색인 요청이라면 색인할 문서 내용을 다음 줄에 적는 방식이다.

 

{"index":{"_index":"bulk_test","_id":"1"}}
{"field":"value1"}
{"delete":{"_index":"bulk_test","_id":"2"}}
{"create":{"_index":"bulk_test","_id":"3"}}
{"field1":"value3"}
{"update":{"_id":"1","_index":"bulk_test"}}
{"doc":{"field2":"value2"}}
{"index":{"_index":"bulk_test","_id":"4","routing":"a"}}
{"field1":"value4"}

요청의 종류로는 index, create, update, delete가 사용됐다. index와 create는 색인 요청으로 다음 줄에 색인할 문서의 _source가 온다. create는 새 문서를 생성하는 것만 허용하고 기존 문서를 덮어쓰지 않는다. update는 업데이트 요청으로 다음 줄에 doc 또는 script를 기술하여 업데이트를 수행한다.

 

POST [index name]/_bulk 이런식으로 인덱스 이름을 넣으면 요청의 기본 대상이 해당 인덱스로 지정된다. 세부 요청에서 인덱스를 따로 적지 않는다면 이 기본 인덱스를 지정한 요청이 된다.

 

bulk API의 응답은 각 세부 요청을 수행하고 결과를 모아 하나의 응답으로 돌아온다.

{
  "errors": false,
  "took": 340,
  "items": [
    {
      "index": {
        "_index": "bulk_test",
        "_id": "1",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    },
    {
      "delete": {
        "_index": "bulk_test",
        "_id": "2",
        "_version": 1,
        "result": "not_found",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 1,
        "_primary_term": 1,
        "status": 404
      }
    },
    {
      "create": {
        "_index": "bulk_test",
        "_id": "3",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 2,
        "_primary_term": 1,
        "status": 201
      }
    },
    {
      "update": {
        "_index": "bulk_test",
        "_id": "1",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 3,
        "_primary_term": 1,
        "status": 200
      }
    },
    {
      "index": {
        "_index": "bulk_test",
        "_id": "4",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 4,
        "_primary_term": 1,
        "status": 201
      }
    }
  ]
}

각 세부 여청의 결과 상태 코드를 status필드로 응답했음을 볼 수 있다. 두 번째 요청의 상태코드는 404다. 존재하지 않는 문서를 삭제하려고 했기 때문이다. 하지만 전체 응답의 상태 코드는 200이다. 클라이언트는 200을 받았다고 안심하지 말고 세부 응답을 파싱해서 원하는 대로 작업이 수행됐는지 확인해야 한다.

 

첫 번째 요청으로 인해 _id가 1인 문서가 생겼고 네 번째 요청에서 이 문서를 업데이트했다. 만약 요청 순서가 바꼈다면 요청이 실패했을 것이다. 작업 순서가 요청한 순서대로 보장될까?

  • bulk API의 작업 순서

항상 요청한 순서대로 작업을 한다는 보장은 없다. 조정 역할을 하는 노드가 요청을 수신하면 각 요청의 내용을 보고 적절한 주 샤드로 요청을 넘겨준다. 여러 개의 주 샤드에 넘어간 각 요청은 각자 독자적으로 수행된다. 따라서 요청 간 순서는 보장되지 않는다.

 

그러나 완전히 동일한 인덱스, _id, 라우팅 조합을 가진 요청은 반드시 동일한 주 샤드로 넘어간다. 따라서 한 bulk API 내에서 이 조합이 같은 요청, 즉 동일한 문서에 대한 요청은 bulk API에 기술된 순서대로 동작한다.

HTTP 요청을 청크로 보내면 안되는 이유

엘라스틱서치는 대량의 데이터를 효율적으로 처리하기 위해 내부적으로 여러 최적화 기법을 사용합니다. 이러한 최적화는 종종 대용량의 일괄 데이터(batch data)를 처리할 때 최적의 성능을 발휘합니다. 청크 단위의 작은 데이터는 이러한 최적화의 이점을 충분히 활용하지 못하게 되어, 상대적으로 낮은 성능을 나타낼 수 있습니다. by ChatGPT

2.2 multi get API

multi get API는 _id를 여럿 지정하여 해당 문서를 한 번에 조회하는 API다. 단건 조회 API를 반복해서 사용하는 것보다 성능이 좋다.

GET _mget
GET [인덱스 이름]/_mget

요청 본문에는 docs 필드 밑에 각 세부 조회 요청을 기술한다. 각 세부 요청은 _index, _id를 포함해야 하며 라우팅이나 특정 필드 포함 및 제거 옵션들을 함께 기술할 수 있다.

GET _mget
{
  "docs": [
    {
      "_index": "bulk_test",
      "_id": 1
    },
    {
      "_index": "bulk_test",
      "_id": 4,
      "routing": "a"
    },
    {
      "_index": "my_index2",
      "_id": "1",
      "_source": {
        "include": [
          "p*"
        ],
        "exclude": [
          "point"
        ]
      }
    }
  ]
}

응답은 요청했던 순서대로 문서의 내용을 모아 단일 응답으로 돌아온다.

만약 _mget 앞에 인덱스 이름을 명시했다면 bulk API 와 마찬가지로 명시한 인덱스가 기본으로 지정된다. 요청 본문에 _index를 생략할 수도 있다. 그런 경우 요청 본문에 docs가 아니라 ids만 기술해서 요청할 수 있다.

GET bulk_test/_mget
{
 "ids": ["1", "3"]
}

2.3 update by query

검색 쿼리를 통해 주어진 조건을 만족하는 문서를 찾은 뒤 그 문서를 대상으로 업데이트 작업을 실시하는 API다.

POST [인덱스 이름]/_update_by_query
{
 "script" : {
  "source": "...",
 },
 "query: {
  ...
 }
}

update by query API는 단건 업데이트 API와 달리 doc을 이용한 업데이트를 지원하지 않는다. script를 통한 업데이트만 지원한다.

 

위와 같이 호출하면 엘라스틱서치는 query 절의 검색 조건에 맞는 문서를 찾아 스냅샷을 찍는다. 이후 각 문서마다 지정된 스크립트에 맞게 업데이트를 실시한다. 여러 문서의 업데이트가 순차적으로 진행되는 도중 다른 작업으로 인해 문서에 변경이 생길 수 있다. update by query API는 이렇게 스냅샷을 찍어 뒀던 문서에서 변화가 생긴 문서를 발견하면 이를 업데이트하지 않는다. 버전 충돌 발생시 전체 작업을 해당 지점에서 그만둘 수도 있고 다음 작업으로 넘어갈 수도 있다. conflicts 매개변수를 지정하면 동작 방식을 지정할 수 있다.

abort 로 지정하면 충동 시 작업을 중단하고, proceed로 지정하면 다음 작업으로 넘어간다. 기본값은 abort다.

 

update by query를 이용한 업데이트 작업은 도중에 충돌이 났거나 다른 문제로 인해 중간에 작업이 중단되더라도 그때까지 업데이트된 내용이 롤백되거나 하지는 않는다. 중단 전까지 작업된 문서는 업데이트된 상태로 남는다.

 

  • 스로틀링

update by query는 문제가 생긴 데이터를 일관적으로 처리하거나 변경된 비즈니스 여건에 맞게 데이터를 일괄 수정하는 작업 등에 많이 활용된다. 하지만 이런 대량 작업을 수행하면 운영 중인 기존 서비스에도 영향을 줄 수 있다. 그러한 상황을 피하기 위해 스로틀링 기능이 있다.

POST bulk_test/_update_by_query?scroll_size=1000&scroll=1m&requests_per_seconds=500
{
 ...
}

update by query는 업데이트 전 먼저 검색을 수행한다. scroll_size를 지정하면 해당 값 만큼 문서를 가져오고 업데이트를 수행한다. 해당 작업이 완료되면 다음 1000개의 문서를 가져와서 작업을 수행한다.

 

update by query는 처음 검색 수행 시 스냅샷을 찍는데 스냅샵을 search context에 보존한다. 이 search context를 얼마나 보존할지 지정하는 것이 scroll이다. 모든 작업이 아닌 한 배치 작업에 필요한 시간을 지정하면 된다. 해당 시간 동안 scroll_size만큼 작업 처리가 가능해야 하므로 너무 짧은 시간을 지정하면 안된다. 그러나 스크롤로 인해 보존되고 있는 검색 문맥은 그만큼 자원을 소비한다. 힙 메모리와 디스크 공간, file descriptor를 차지하므로 스크롤이 과다하게 열려 있지 않도록 너무 큰 값을 지정하지 않아야 한다.

 

스로틀링을 실제 적용하기 위해 requests_per_second 설정을 이용한다. requests_per_second 설정은 이름 그대로 평균적으로 초당 몇 개까지의 작업을 수행할 것인가를 지정한다. 실제 동작은 일단 scroll_size 단위로 업데이트 작업을 수행한 뒤 requests_per_second 값에 맞도록 일정 시간을 대기하는 방식으로 진행한다. 예를 들어, scroll_size를 1000으로 지정하고 requests_per_second를 500으로 설정했다면 엘라스틱 서치는 2초마다 스크롤 한 번 분량만큼 업데이트한다. requests_per_second의 기본값은 -1로 스로틀링을 적용하지 않는 설정이다.

  • 비동기적 요청과 tasks API

엘라스틱서치에서 update by query 요청 시 wait_for_completion 매개변수를 false로 지정하면 비동기적 처리를 할 수 있다. 요청을 받으면 엘라스틱서치는 작업을 task로 등록한 뒤 task의 id가 포함된 응답을  반환한다. 이 값은 노드의 id와 해당 노드 내 task의 id를 : 으로 연결한 형태다. 이 값을 가지고 tasks API를 호출하여 작업의 진행 경과를 확인하거나 작업 취소 할 수있다.

 

모든 수행 중인 update by query는 wait_for_completion 값에 상관없이 task형태로 동작하며 tasks 조회 API를 통해 작업 직행을 확인할 수 있다. 다만 wait_for_completion을 false로 지정하면 진행 상황이 .tasks라는 내부 인덱스에 문서로 저장된다. 또한 작업이 종료된 후에도 작업 결과를 조회할 수 있다.

 

POST my-index-01/_update_by_query?wait_for_completion=false

 

요청 결과

GET .tasks/_doc/[task id]
GET _tasks/[task id]

위와 같은 요청으로 task의 상태를 확인할 수 있다.

POST _tasks/[task id]/_cancel

위와 같은 요청으로 task를 취소할 수 있다.

 

.tasks 인덱스에 등록된 task외 wait_for_completion=true로 지정해 호출한 작업도 GET _tasks를 호출하면 task목록에서 진행 중인 작업을 확인하고 취소할 수 있다.

POST _update_by_query/[task id]/_rethrottle?requests_per_second=[변경할 값]

위와 같은 방식으로 진행중인 task의 스로틀링을 동적으로 변경할 수 있다. _update_by_query의 경우 진행된 update의 결과를 롤백하지 않으므로 _update_by_query실행 도중 문제가 생겨 멈추더라도 추후 변경되지 않은 문서를 대상으로 다시 _update_by_query를 실행하면 된다. 하지만 _update_by_query를 멈출 수 없거나 진행하지 않은 문서를 필터하기 어려운 경우 위와 같은 쿼리를 통해 스로틀링을 동적으로 변경하면 된다.

 

DELETE .tasks/_doc/[task id]

작업의 상황을 충분히 확인 했다면 다음과 같이 .tasks 인덱스에서 문서를 삭제하면 좋다.

  • 슬라이싱

스로틀링을 적용해 부하를 줄이는 선택도 있지만 반대로 업데이트 성능을 최대로 끌어내 빠른 시간 안에 끝내는 선택지도 있는데 이때 슬라리싱을 이용한다.

 

 slices 매개변수를 지정하면 검색과 업데이트를 지정한 개수로 쪼개 병렬적으로 수행한다.

POST [인덱스 이름]/_update_by_query?slices=auto

slices의 기본값은 1로, 작업을  병렬로 쪼개지 않는다는 뜻이다. slices의 값을 auto로 지정하면 엘라스틱서치가 적절한 개수를 지정해서 작업을 병렬 수행한다. 보통은 지정한 인덱스의 주 샤드 수가 슬라이스의 수가 된다. 여러 인덱스를 연결해서 지정한 경우 가장 적은 주 샤드를 가진 인덱스를 기준으로 슬라이스의 수가 지정된다.

 

auto가 아닌 직접 숫자를 지정할 수도 있다. 이 경우 슬라이스의 수가 주 샤드 수를 넘는 경우 성능이 급감할 수 있다. 슬라이스의 수가 샤드 수 보다 많으면 필연적으로 한 샤드 내의 데이터를 여러 슬라이스에 분배하는 작업이 필요해진다.

 

슬라이싱은 기본적으로 샤드를 기준으로 작업을 쪼개기 때문에 각 요청 슬라이스가 동일한 작업량을 배분받지 않는다. 또한 requests_per_second 옵션은 각 슬라이스에 쪼개져서 적용된다. 예를 들어, requests_per_second가 1000, 슬라이스 개수가 5라면 각 슬라이스는 requests_per_second 200씩 분배받는다.

 

슬라이싱 적용하여 update by query를 수행 후 GET _tasks로 실행 중인 tasks를 조회하면 task사이에 부모 자식 관계가 설정된 것을 볼 수 있다.

2.4 delete by query

delete by query는 update by query 처럼 먼저 지정한 검색 쿼리로 삭제할 대상을 지정 후 삭제를 수행하는 작업이다.

 

delete by query 역시 update by query와 마찬가지로 검색 조건에 맞는 문서를 찾아 스냅샷을 찍는다. 삭제  작업이 진행되는 동안 문서의 내용이 변경됐다면 버전 충돌이 일어나는 부분도 update by query와 동일하다. 버전 충돌 시 작업 여부를 지정하는 conflicts 매개변수, .tasks인덱스에 작업 등록과 비동기적 실행을 위한 wait_for_completion 매개변수, tasks API를 통한 관리, 스로틀링 적용, 슬라이싱 적용 등과 관련된 내용도 update by query와 동일하다